Skip to content

Commit

Permalink
✨ add sendWithRetryStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
bcaudan committed Aug 29, 2022
1 parent 60444e2 commit 4e28db2
Show file tree
Hide file tree
Showing 4 changed files with 371 additions and 1 deletion.
1 change: 1 addition & 0 deletions packages/core/src/tools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const ONE_HOUR = 60 * ONE_MINUTE
export const ONE_DAY = 24 * ONE_HOUR
export const ONE_YEAR = 365 * ONE_DAY
export const ONE_KILO_BYTE = 1024
export const ONE_MEGA_BYTE = 1024 * ONE_KILO_BYTE

export const enum DOM_EVENT {
BEFORE_UNLOAD = 'beforeunload',
Expand Down
12 changes: 11 additions & 1 deletion packages/core/src/transport/httpRequest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { EndpointBuilder } from '../domain/configuration'
import { addTelemetryError } from '../domain/telemetry'
import { monitor } from '../tools/monitor'
import { isExperimentalFeatureEnabled } from '../domain/configuration'
import { newRetryState, sendWithRetryStrategy } from './sendWithRetryStrategy'

/**
* Use POST request without content type to:
Expand All @@ -22,9 +24,17 @@ export interface Payload {
}

export function createHttpRequest(endpointBuilder: EndpointBuilder, bytesLimit: number) {
const retryState = newRetryState()
const sendStrategyForRetry = (payload: Payload, onResponse: (r: HttpResponse) => void) =>
fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload, onResponse)

return {
send: (payload: Payload) => {
fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload)
if (!isExperimentalFeatureEnabled('retry')) {
fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload)
} else {
sendWithRetryStrategy(payload, retryState, sendStrategyForRetry)
}
},
/**
* Since fetch keepalive behaves like regular fetch on Firefox,
Expand Down
212 changes: 212 additions & 0 deletions packages/core/src/transport/sendWithRetryStrategy.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { mockClock } from '../../test/specHelper'
import type { Clock } from '../../test/specHelper'
import { ONE_SECOND } from '../tools/utils'
import type { RetryState } from './sendWithRetryStrategy'
import {
newRetryState,
sendWithRetryStrategy,
MAX_ONGOING_BYTES_COUNT,
MAX_ONGOING_REQUESTS,
MAX_QUEUE_BYTES_COUNT,
} from './sendWithRetryStrategy'
import type { Payload, HttpResponse } from './httpRequest'

describe('sendWithRetryStrategy', () => {
let sendStub: ReturnType<typeof newSendStub>
let state: RetryState
let sendRequest: (payload?: Partial<Payload>) => void
let clock: Clock

function newSendStub() {
const requests: Array<(r: HttpResponse) => void> = []
return {
sendStrategy: (_: Payload, onResponse: (r: HttpResponse) => void) => {
requests.push(onResponse)
},
respondWith: (index: number, r: HttpResponse) => {
requests[index](r)
requests[index] = () => {
throw new Error('response already handled')
}
},
}
}

beforeEach(() => {
sendStub = newSendStub()
state = newRetryState()
clock = mockClock()
sendRequest = (payload) => {
const effectivePayload = {
data: payload?.data ?? 'a',
bytesCount: payload?.bytesCount ?? 1,
}
sendWithRetryStrategy(effectivePayload, state, sendStub.sendStrategy)
}
})

afterEach(() => {
clock.cleanup()
})

describe('nominal cases:', () => {
it('should send request when no bandwidth limit reached', () => {
sendRequest()
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(0)

sendStub.respondWith(0, { status: 200 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0)
expect(state.queuedPayloads.size()).toBe(0)
})

it('should allow to send request payload greater than bandwidth limit', () => {
sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT + 10 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(0)
})

it('should send concurrent requests ', () => {
sendRequest()
sendRequest()
sendRequest()
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(3)
expect(state.queuedPayloads.size()).toBe(0)

sendStub.respondWith(0, { status: 200 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(2)

sendStub.respondWith(1, { status: 200 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)

sendStub.respondWith(2, { status: 200 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0)
})
})

describe('bandwidth limitation:', () => {
it('should queue request when its payload would overflow bytes limit', () => {
sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT - 10 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(0)

sendRequest({ bytesCount: 11 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(1)
})

it('should queue request when too much ongoing requests', () => {
for (let i = 1; i <= MAX_ONGOING_REQUESTS; i++) {
sendRequest()
}
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(MAX_ONGOING_REQUESTS)
expect(state.queuedPayloads.size()).toBe(0)

sendRequest()
expect(state.queuedPayloads.size()).toBe(1)
})

it('should not queue payload bigger than queue limit', () => {
sendRequest()
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(0)

sendRequest({ bytesCount: MAX_QUEUE_BYTES_COUNT + 1 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(0)
})
})

describe('dequeue:', () => {
it('should send as much queued request as possible after a successful request', () => {
sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT })
for (let i = 1; i <= MAX_ONGOING_REQUESTS; i++) {
sendRequest()
}
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(MAX_ONGOING_REQUESTS)

sendStub.respondWith(0, { status: 200 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(MAX_ONGOING_REQUESTS)
expect(state.queuedPayloads.size()).toBe(0)
})

it('should remove the oldest payloads when a new payload would overflow queue bytes limit', () => {
sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT })
sendRequest({ bytesCount: MAX_QUEUE_BYTES_COUNT - 10 })
sendRequest({ bytesCount: 10 })
expect(state.queuedPayloads.size()).toBe(2)
expect(state.queuedPayloads.bytesCount).toBe(MAX_QUEUE_BYTES_COUNT)

sendRequest({ bytesCount: 1 })
expect(state.queuedPayloads.size()).toBe(2)
expect(state.queuedPayloads.bytesCount).toBe(11)
})
})

describe('when a request fails:', () => {
it('should start queueing following requests', () => {
sendRequest()
sendStub.respondWith(0, { status: 500 })
expect(state.queuedPayloads.size()).toBe(1)

sendRequest()
expect(state.queuedPayloads.size()).toBe(2)
sendRequest()
expect(state.queuedPayloads.size()).toBe(3)
})

it('should send queued requests if another ongoing request succeed', () => {
sendRequest()
sendRequest()
sendStub.respondWith(0, { status: 500 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(1)

sendRequest()
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
expect(state.queuedPayloads.size()).toBe(2)

sendStub.respondWith(1, { status: 200 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(2)
expect(state.queuedPayloads.size()).toBe(0)
})
})

describe('when intake offline:', () => {
it('should regularly try to send first queued request', () => {
sendRequest()
sendStub.respondWith(0, { status: 500 })
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0)

clock.tick(ONE_SECOND)
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
sendStub.respondWith(1, { status: 500 })

clock.tick(2 * ONE_SECOND)
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
sendStub.respondWith(2, { status: 500 })

clock.tick(4 * ONE_SECOND)
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
sendStub.respondWith(3, { status: 500 })
})

it('should send queued requests after first successful request', () => {
sendRequest()
sendStub.respondWith(0, { status: 500 })
sendRequest()
sendRequest()
sendRequest()
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0)
expect(state.queuedPayloads.size()).toBe(4)

clock.tick(ONE_SECOND)
expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1)
sendStub.respondWith(1, { status: 200 })

expect(state.bandwidthMonitor.ongoingRequestCount).toBe(3)
expect(state.queuedPayloads.size()).toBe(0)
})
})
})
147 changes: 147 additions & 0 deletions packages/core/src/transport/sendWithRetryStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { monitor } from '../tools/monitor'
import { ONE_KILO_BYTE, ONE_MEGA_BYTE, ONE_SECOND } from '../tools/utils'
import type { Payload, HttpResponse } from './httpRequest'

export const MAX_ONGOING_BYTES_COUNT = 80 * ONE_KILO_BYTE
export const MAX_ONGOING_REQUESTS = 32
export const MAX_QUEUE_BYTES_COUNT = 3 * ONE_MEGA_BYTE
export const MAX_BACKOFF_TIME = 256 * ONE_SECOND

export interface RetryState {
isIntakeAvailable: boolean
currentBackoffTime: number
bandwidthMonitor: ReturnType<typeof newBandwidthMonitor>
queuedPayloads: ReturnType<typeof newPayloadQueue>
}

type SendStrategy = (payload: Payload, onResponse: (r: HttpResponse) => void) => void

export function sendWithRetryStrategy(payload: Payload, state: RetryState, sendStrategy: SendStrategy) {
if (state.isIntakeAvailable && state.bandwidthMonitor.canHandle(payload)) {
send(payload, state, sendStrategy, {
onSuccess: () => sendNextPayload(state, sendStrategy),
onFailure: () => {
state.queuedPayloads.enqueue(payload)
scheduleRetry(state, sendStrategy)
},
})
sendNextPayload(state, sendStrategy)
} else {
state.queuedPayloads.enqueue(payload)
}
}

function scheduleRetry(state: RetryState, sendStrategy: SendStrategy) {
if (state.bandwidthMonitor.ongoingRequestCount !== 0) {
// avoid to enter a retry phase if another ongoing request can succeed
return
}
setTimeout(
monitor(() => {
const payload = state.queuedPayloads.first()
send(payload, state, sendStrategy, {
onSuccess: () => {
state.queuedPayloads.dequeue()
state.currentBackoffTime = ONE_SECOND
sendNextPayload(state, sendStrategy)
},
onFailure: () => {
state.currentBackoffTime = Math.min(MAX_BACKOFF_TIME, state.currentBackoffTime * 2)
scheduleRetry(state, sendStrategy)
},
})
}),
state.currentBackoffTime
)
}

function sendNextPayload(state: RetryState, sendStrategy: SendStrategy) {
const nextPayload = state.queuedPayloads.dequeue()
if (nextPayload) {
sendWithRetryStrategy(nextPayload, state, sendStrategy)
}
}

function send(
payload: Payload,
state: RetryState,
sendStrategy: SendStrategy,
{ onSuccess, onFailure }: { onSuccess: () => void; onFailure: () => void }
) {
state.bandwidthMonitor.add(payload)
sendStrategy(payload, (response) => {
state.bandwidthMonitor.remove(payload)
if (wasRequestSuccessful(response)) {
state.isIntakeAvailable = true
onSuccess()
} else {
state.isIntakeAvailable = false
onFailure()
}
})
}

function wasRequestSuccessful(response: HttpResponse) {
return response.status < 500
}

export function newRetryState(): RetryState {
return {
isIntakeAvailable: true,
currentBackoffTime: ONE_SECOND,
bandwidthMonitor: newBandwidthMonitor(),
queuedPayloads: newPayloadQueue(),
}
}

function newPayloadQueue() {
const queue: Payload[] = []
return {
bytesCount: 0,
enqueue(payload: Payload) {
if (payload.bytesCount > MAX_QUEUE_BYTES_COUNT) {
return
}
while (payload.bytesCount + this.bytesCount > MAX_QUEUE_BYTES_COUNT) {
this.dequeue()
}
queue.push(payload)
this.bytesCount += payload.bytesCount
},
first() {
return queue[0]
},
dequeue() {
const payload = queue.shift()
if (payload) {
this.bytesCount -= payload.bytesCount
}
return payload
},
size() {
return queue.length
},
}
}

function newBandwidthMonitor() {
return {
ongoingRequestCount: 0,
ongoingByteCount: 0,
canHandle(payload: Payload) {
return (
this.ongoingRequestCount === 0 ||
(this.ongoingByteCount + payload.bytesCount <= MAX_ONGOING_BYTES_COUNT &&
this.ongoingRequestCount < MAX_ONGOING_REQUESTS)
)
},
add(payload: Payload) {
this.ongoingRequestCount += 1
this.ongoingByteCount += payload.bytesCount
},
remove(payload: Payload) {
this.ongoingRequestCount -= 1
this.ongoingByteCount -= payload.bytesCount
},
}
}

0 comments on commit 4e28db2

Please sign in to comment.