From 4e28db2f6fa458959862b8db4f0498ab95d92f24 Mon Sep 17 00:00:00 2001 From: Bastien Caudan Date: Mon, 29 Aug 2022 17:36:24 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20add=20sendWithRetryStrategy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/tools/utils.ts | 1 + packages/core/src/transport/httpRequest.ts | 12 +- .../transport/sendWithRetryStrategy.spec.ts | 212 ++++++++++++++++++ .../src/transport/sendWithRetryStrategy.ts | 147 ++++++++++++ 4 files changed, 371 insertions(+), 1 deletion(-) create mode 100644 packages/core/src/transport/sendWithRetryStrategy.spec.ts create mode 100644 packages/core/src/transport/sendWithRetryStrategy.ts diff --git a/packages/core/src/tools/utils.ts b/packages/core/src/tools/utils.ts index f9713a3166..167d9b122c 100644 --- a/packages/core/src/tools/utils.ts +++ b/packages/core/src/tools/utils.ts @@ -6,6 +6,7 @@ export const ONE_HOUR = 60 * ONE_MINUTE export const ONE_DAY = 24 * ONE_HOUR export const ONE_YEAR = 365 * ONE_DAY export const ONE_KILO_BYTE = 1024 +export const ONE_MEGA_BYTE = 1024 * ONE_KILO_BYTE export const enum DOM_EVENT { BEFORE_UNLOAD = 'beforeunload', diff --git a/packages/core/src/transport/httpRequest.ts b/packages/core/src/transport/httpRequest.ts index 4a724a69ff..ed7a50f61e 100644 --- a/packages/core/src/transport/httpRequest.ts +++ b/packages/core/src/transport/httpRequest.ts @@ -1,6 +1,8 @@ import type { EndpointBuilder } from '../domain/configuration' import { addTelemetryError } from '../domain/telemetry' import { monitor } from '../tools/monitor' +import { isExperimentalFeatureEnabled } from '../domain/configuration' +import { newRetryState, sendWithRetryStrategy } from './sendWithRetryStrategy' /** * Use POST request without content type to: @@ -22,9 +24,17 @@ export interface Payload { } export function createHttpRequest(endpointBuilder: EndpointBuilder, bytesLimit: number) { + const retryState = newRetryState() + const sendStrategyForRetry = (payload: Payload, onResponse: (r: HttpResponse) => void) => + fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload, onResponse) + return { send: (payload: Payload) => { - fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload) + if (!isExperimentalFeatureEnabled('retry')) { + fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload) + } else { + sendWithRetryStrategy(payload, retryState, sendStrategyForRetry) + } }, /** * Since fetch keepalive behaves like regular fetch on Firefox, diff --git a/packages/core/src/transport/sendWithRetryStrategy.spec.ts b/packages/core/src/transport/sendWithRetryStrategy.spec.ts new file mode 100644 index 0000000000..6dd205f6db --- /dev/null +++ b/packages/core/src/transport/sendWithRetryStrategy.spec.ts @@ -0,0 +1,212 @@ +import { mockClock } from '../../test/specHelper' +import type { Clock } from '../../test/specHelper' +import { ONE_SECOND } from '../tools/utils' +import type { RetryState } from './sendWithRetryStrategy' +import { + newRetryState, + sendWithRetryStrategy, + MAX_ONGOING_BYTES_COUNT, + MAX_ONGOING_REQUESTS, + MAX_QUEUE_BYTES_COUNT, +} from './sendWithRetryStrategy' +import type { Payload, HttpResponse } from './httpRequest' + +describe('sendWithRetryStrategy', () => { + let sendStub: ReturnType + let state: RetryState + let sendRequest: (payload?: Partial) => void + let clock: Clock + + function newSendStub() { + const requests: Array<(r: HttpResponse) => void> = [] + return { + sendStrategy: (_: Payload, onResponse: (r: HttpResponse) => void) => { + requests.push(onResponse) + }, + respondWith: (index: number, r: HttpResponse) => { + requests[index](r) + requests[index] = () => { + throw new Error('response already handled') + } + }, + } + } + + beforeEach(() => { + sendStub = newSendStub() + state = newRetryState() + clock = mockClock() + sendRequest = (payload) => { + const effectivePayload = { + data: payload?.data ?? 'a', + bytesCount: payload?.bytesCount ?? 1, + } + sendWithRetryStrategy(effectivePayload, state, sendStub.sendStrategy) + } + }) + + afterEach(() => { + clock.cleanup() + }) + + describe('nominal cases:', () => { + it('should send request when no bandwidth limit reached', () => { + sendRequest() + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(0) + + sendStub.respondWith(0, { status: 200 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0) + expect(state.queuedPayloads.size()).toBe(0) + }) + + it('should allow to send request payload greater than bandwidth limit', () => { + sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT + 10 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(0) + }) + + it('should send concurrent requests ', () => { + sendRequest() + sendRequest() + sendRequest() + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(3) + expect(state.queuedPayloads.size()).toBe(0) + + sendStub.respondWith(0, { status: 200 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(2) + + sendStub.respondWith(1, { status: 200 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + + sendStub.respondWith(2, { status: 200 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0) + }) + }) + + describe('bandwidth limitation:', () => { + it('should queue request when its payload would overflow bytes limit', () => { + sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT - 10 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(0) + + sendRequest({ bytesCount: 11 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(1) + }) + + it('should queue request when too much ongoing requests', () => { + for (let i = 1; i <= MAX_ONGOING_REQUESTS; i++) { + sendRequest() + } + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(MAX_ONGOING_REQUESTS) + expect(state.queuedPayloads.size()).toBe(0) + + sendRequest() + expect(state.queuedPayloads.size()).toBe(1) + }) + + it('should not queue payload bigger than queue limit', () => { + sendRequest() + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(0) + + sendRequest({ bytesCount: MAX_QUEUE_BYTES_COUNT + 1 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(0) + }) + }) + + describe('dequeue:', () => { + it('should send as much queued request as possible after a successful request', () => { + sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT }) + for (let i = 1; i <= MAX_ONGOING_REQUESTS; i++) { + sendRequest() + } + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(MAX_ONGOING_REQUESTS) + + sendStub.respondWith(0, { status: 200 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(MAX_ONGOING_REQUESTS) + expect(state.queuedPayloads.size()).toBe(0) + }) + + it('should remove the oldest payloads when a new payload would overflow queue bytes limit', () => { + sendRequest({ bytesCount: MAX_ONGOING_BYTES_COUNT }) + sendRequest({ bytesCount: MAX_QUEUE_BYTES_COUNT - 10 }) + sendRequest({ bytesCount: 10 }) + expect(state.queuedPayloads.size()).toBe(2) + expect(state.queuedPayloads.bytesCount).toBe(MAX_QUEUE_BYTES_COUNT) + + sendRequest({ bytesCount: 1 }) + expect(state.queuedPayloads.size()).toBe(2) + expect(state.queuedPayloads.bytesCount).toBe(11) + }) + }) + + describe('when a request fails:', () => { + it('should start queueing following requests', () => { + sendRequest() + sendStub.respondWith(0, { status: 500 }) + expect(state.queuedPayloads.size()).toBe(1) + + sendRequest() + expect(state.queuedPayloads.size()).toBe(2) + sendRequest() + expect(state.queuedPayloads.size()).toBe(3) + }) + + it('should send queued requests if another ongoing request succeed', () => { + sendRequest() + sendRequest() + sendStub.respondWith(0, { status: 500 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(1) + + sendRequest() + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + expect(state.queuedPayloads.size()).toBe(2) + + sendStub.respondWith(1, { status: 200 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(2) + expect(state.queuedPayloads.size()).toBe(0) + }) + }) + + describe('when intake offline:', () => { + it('should regularly try to send first queued request', () => { + sendRequest() + sendStub.respondWith(0, { status: 500 }) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0) + + clock.tick(ONE_SECOND) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + sendStub.respondWith(1, { status: 500 }) + + clock.tick(2 * ONE_SECOND) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + sendStub.respondWith(2, { status: 500 }) + + clock.tick(4 * ONE_SECOND) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + sendStub.respondWith(3, { status: 500 }) + }) + + it('should send queued requests after first successful request', () => { + sendRequest() + sendStub.respondWith(0, { status: 500 }) + sendRequest() + sendRequest() + sendRequest() + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(0) + expect(state.queuedPayloads.size()).toBe(4) + + clock.tick(ONE_SECOND) + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(1) + sendStub.respondWith(1, { status: 200 }) + + expect(state.bandwidthMonitor.ongoingRequestCount).toBe(3) + expect(state.queuedPayloads.size()).toBe(0) + }) + }) +}) diff --git a/packages/core/src/transport/sendWithRetryStrategy.ts b/packages/core/src/transport/sendWithRetryStrategy.ts new file mode 100644 index 0000000000..465c1c355d --- /dev/null +++ b/packages/core/src/transport/sendWithRetryStrategy.ts @@ -0,0 +1,147 @@ +import { monitor } from '../tools/monitor' +import { ONE_KILO_BYTE, ONE_MEGA_BYTE, ONE_SECOND } from '../tools/utils' +import type { Payload, HttpResponse } from './httpRequest' + +export const MAX_ONGOING_BYTES_COUNT = 80 * ONE_KILO_BYTE +export const MAX_ONGOING_REQUESTS = 32 +export const MAX_QUEUE_BYTES_COUNT = 3 * ONE_MEGA_BYTE +export const MAX_BACKOFF_TIME = 256 * ONE_SECOND + +export interface RetryState { + isIntakeAvailable: boolean + currentBackoffTime: number + bandwidthMonitor: ReturnType + queuedPayloads: ReturnType +} + +type SendStrategy = (payload: Payload, onResponse: (r: HttpResponse) => void) => void + +export function sendWithRetryStrategy(payload: Payload, state: RetryState, sendStrategy: SendStrategy) { + if (state.isIntakeAvailable && state.bandwidthMonitor.canHandle(payload)) { + send(payload, state, sendStrategy, { + onSuccess: () => sendNextPayload(state, sendStrategy), + onFailure: () => { + state.queuedPayloads.enqueue(payload) + scheduleRetry(state, sendStrategy) + }, + }) + sendNextPayload(state, sendStrategy) + } else { + state.queuedPayloads.enqueue(payload) + } +} + +function scheduleRetry(state: RetryState, sendStrategy: SendStrategy) { + if (state.bandwidthMonitor.ongoingRequestCount !== 0) { + // avoid to enter a retry phase if another ongoing request can succeed + return + } + setTimeout( + monitor(() => { + const payload = state.queuedPayloads.first() + send(payload, state, sendStrategy, { + onSuccess: () => { + state.queuedPayloads.dequeue() + state.currentBackoffTime = ONE_SECOND + sendNextPayload(state, sendStrategy) + }, + onFailure: () => { + state.currentBackoffTime = Math.min(MAX_BACKOFF_TIME, state.currentBackoffTime * 2) + scheduleRetry(state, sendStrategy) + }, + }) + }), + state.currentBackoffTime + ) +} + +function sendNextPayload(state: RetryState, sendStrategy: SendStrategy) { + const nextPayload = state.queuedPayloads.dequeue() + if (nextPayload) { + sendWithRetryStrategy(nextPayload, state, sendStrategy) + } +} + +function send( + payload: Payload, + state: RetryState, + sendStrategy: SendStrategy, + { onSuccess, onFailure }: { onSuccess: () => void; onFailure: () => void } +) { + state.bandwidthMonitor.add(payload) + sendStrategy(payload, (response) => { + state.bandwidthMonitor.remove(payload) + if (wasRequestSuccessful(response)) { + state.isIntakeAvailable = true + onSuccess() + } else { + state.isIntakeAvailable = false + onFailure() + } + }) +} + +function wasRequestSuccessful(response: HttpResponse) { + return response.status < 500 +} + +export function newRetryState(): RetryState { + return { + isIntakeAvailable: true, + currentBackoffTime: ONE_SECOND, + bandwidthMonitor: newBandwidthMonitor(), + queuedPayloads: newPayloadQueue(), + } +} + +function newPayloadQueue() { + const queue: Payload[] = [] + return { + bytesCount: 0, + enqueue(payload: Payload) { + if (payload.bytesCount > MAX_QUEUE_BYTES_COUNT) { + return + } + while (payload.bytesCount + this.bytesCount > MAX_QUEUE_BYTES_COUNT) { + this.dequeue() + } + queue.push(payload) + this.bytesCount += payload.bytesCount + }, + first() { + return queue[0] + }, + dequeue() { + const payload = queue.shift() + if (payload) { + this.bytesCount -= payload.bytesCount + } + return payload + }, + size() { + return queue.length + }, + } +} + +function newBandwidthMonitor() { + return { + ongoingRequestCount: 0, + ongoingByteCount: 0, + canHandle(payload: Payload) { + return ( + this.ongoingRequestCount === 0 || + (this.ongoingByteCount + payload.bytesCount <= MAX_ONGOING_BYTES_COUNT && + this.ongoingRequestCount < MAX_ONGOING_REQUESTS) + ) + }, + add(payload: Payload) { + this.ongoingRequestCount += 1 + this.ongoingByteCount += payload.bytesCount + }, + remove(payload: Payload) { + this.ongoingRequestCount -= 1 + this.ongoingByteCount -= payload.bytesCount + }, + } +}