Skip to content

Commit

Permalink
rfe(replay): Stop recording when hitting a rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Lms24 committed Feb 1, 2023
1 parent f352f97 commit d074416
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 241 deletions.
34 changes: 1 addition & 33 deletions packages/replay/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import { EventType, record } from '@sentry-internal/rrweb';
import { captureException } from '@sentry/core';
import type { Breadcrumb, ReplayRecordingMode } from '@sentry/types';
import type { RateLimits } from '@sentry/utils';
import { disabledUntil, logger } from '@sentry/utils';
import { logger } from '@sentry/utils';

import {
ERROR_CHECKOUT_TIME,
Expand Down Expand Up @@ -40,7 +39,6 @@ import { isExpired } from './util/isExpired';
import { isSessionExpired } from './util/isSessionExpired';
import { overwriteRecordDroppedEvent, restoreRecordDroppedEvent } from './util/monkeyPatchRecordDroppedEvent';
import { sendReplay } from './util/sendReplay';
import { RateLimitError } from './util/sendReplayRequest';

/**
* The main replay container class, which holds all the state and methods for recording and sending replays.
Expand Down Expand Up @@ -809,11 +807,6 @@ export class ReplayContainer implements ReplayContainerInterface {
} catch (err) {
this._handleException(err);

if (err instanceof RateLimitError) {
this._handleRateLimit(err.rateLimits);
return;
}

// This means we retried 3 times, and all of them failed
// In this case, we want to completely stop the replay - otherwise, we may get inconsistent segments
this.stop();
Expand Down Expand Up @@ -873,29 +866,4 @@ export class ReplayContainer implements ReplayContainerInterface {
saveSession(this.session);
}
}

/**
* Pauses the replay and resumes it after the rate-limit duration is over.
*/
private _handleRateLimit(rateLimits: RateLimits): void {
// in case recording is already paused, we don't need to do anything, as we might have already paused because of a
// rate limit
if (this.isPaused()) {
return;
}

const rateLimitEnd = disabledUntil(rateLimits, 'replay');
const rateLimitDuration = rateLimitEnd - Date.now();

if (rateLimitDuration > 0) {
__DEBUG_BUILD__ && logger.warn('[Replay]', `Rate limit hit, pausing replay for ${rateLimitDuration}ms`);
this.pause();
this._debouncedFlush.cancel();

setTimeout(() => {
__DEBUG_BUILD__ && logger.info('[Replay]', 'Resuming replay after rate limit');
this.resume();
}, rateLimitDuration);
}
}
}
4 changes: 2 additions & 2 deletions packages/replay/src/util/sendReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { captureException, setContext } from '@sentry/core';

import { RETRY_BASE_INTERVAL, RETRY_MAX_COUNT, UNABLE_TO_SEND_REPLAY } from '../constants';
import type { SendReplayData } from '../types';
import { RateLimitError, sendReplayRequest, TransportStatusCodeError } from './sendReplayRequest';
import { sendReplayRequest, TransportStatusCodeError } from './sendReplayRequest';

/**
* Finalize and send the current replay event to Sentry
Expand All @@ -25,7 +25,7 @@ export async function sendReplay(
await sendReplayRequest(replayData);
return true;
} catch (err) {
if (err instanceof RateLimitError || err instanceof TransportStatusCodeError) {
if (err instanceof TransportStatusCodeError) {
throw err;
}

Expand Down
20 changes: 1 addition & 19 deletions packages/replay/src/util/sendReplayRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { getCurrentHub } from '@sentry/core';
import type { ReplayEvent, TransportMakeRequestResponse } from '@sentry/types';
import type { RateLimits } from '@sentry/utils';
import { isRateLimited, logger, updateRateLimits } from '@sentry/utils';
import { logger } from '@sentry/utils';

import { REPLAY_EVENT_NAME, UNABLE_TO_SEND_REPLAY } from '../constants';
import type { SendReplayData } from '../types';
Expand Down Expand Up @@ -125,11 +124,6 @@ export async function sendReplayRequest({
return response;
}

const rateLimits = updateRateLimits({}, response);
if (isRateLimited(rateLimits, 'replay')) {
throw new RateLimitError(rateLimits);
}

// If the status code is invalid, we want to immediately stop & not retry
if (typeof response.statusCode === 'number' && (response.statusCode < 200 || response.statusCode >= 300)) {
throw new TransportStatusCodeError(response.statusCode);
Expand All @@ -138,18 +132,6 @@ export async function sendReplayRequest({
return response;
}

/**
* This error indicates that we hit a rate limit API error.
*/
export class RateLimitError extends Error {
public rateLimits: RateLimits;

public constructor(rateLimits: RateLimits) {
super('Rate limit hit');
this.rateLimits = rateLimits;
}
}

/**
* This error indicates that the transport returned an invalid status code.
*/
Expand Down
197 changes: 10 additions & 187 deletions packages/replay/test/integration/rateLimiting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,120 +64,9 @@ describe('Integration | rate-limiting behaviour', () => {
replay && replay.stop();
});

it.each([
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': '30:replay',
'retry-after': null,
},
},
{
statusCode: 429,
headers: {
'x-sentry-rate-limits': null,
'retry-after': '30',
},
},
] as TransportMakeRequestResponse[])(
'pauses recording and flushing a rate limit is hit and resumes both after the rate limit duration is over %j',
async rateLimitResponse => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve(rateLimitResponse);
});

mockRecord._emitter(TEST_EVENT);

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (30secs) and check that we don't do anything in the meantime
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 5; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 35
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);

expect(mockSendReplayRequest).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
recordingData: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 7, type: 2 },
]),
});

// and let's also emit a new event and check that it is recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
type: 3,
};
mockRecord._emitter(TEST_EVENT3);

// T = base + 40
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 60
await advanceTimers(20_000);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.pendingLength).toBe(0);
},
);

it('handles rate-limits from a plain 429 response without any retry time', async () => {
it('handles rate-limit 429 responses by stopping the replay', async () => {
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');
jest.spyOn(replay, 'stop');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

Expand All @@ -194,46 +83,23 @@ describe('Integration | rate-limiting behaviour', () => {
expect(mockTransportSend).toHaveBeenCalledTimes(1);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
// resume() was called once before we even started
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.pause).toHaveBeenCalledTimes(1);
expect(replay.stop).toHaveBeenCalledTimes(1);

// No user activity to trigger an update
expect(replay.session?.lastActivity).toBe(BASE_TIMESTAMP);
expect(replay.session?.segmentId).toBe(1);

// let's simulate the rate-limit time of inactivity (60secs) and check that we don't do anything in the meantime
// let's simulate the default rate-limit time of inactivity (60secs) and check that we
// don't do anything in the meantime or after the time has passed
// 60secs are the default we fall back to in the plain 429 case in updateRateLimits()
const TEST_EVENT2 = { data: {}, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY, type: 3 };
for (let i = 0; i < 11; i++) {
const ev = {
...TEST_EVENT2,
timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * (i + 1),
};
mockRecord._emitter(ev);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(replay.isPaused()).toBe(true);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);
}

// T = base + 60
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

// now, recording should resume and first, we expect a checkout event to be sent, as resume()
// should trigger a full snapshot
expect(replay.resume).toHaveBeenCalledTimes(1);
expect(replay.isPaused()).toBe(false);
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY * 12);

expect(mockSendReplayRequest).toHaveBeenCalledTimes(2);
expect(replay).toHaveLastSentReplay({
recordingData: JSON.stringify([
{ data: { isCheckout: true }, timestamp: BASE_TIMESTAMP + DEFAULT_FLUSH_MIN_DELAY * 13, type: 2 },
]),
});
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);

// and let's also emit a new event and check that it is recorded
// and let's also emit a new event and check that it is not recorded
const TEST_EVENT3 = {
data: {},
timestamp: BASE_TIMESTAMP + 7 * DEFAULT_FLUSH_MIN_DELAY,
Expand All @@ -243,50 +109,7 @@ describe('Integration | rate-limiting behaviour', () => {

// T = base + 65
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// nothing should happen afterwards
// T = base + 85
await advanceTimers(20_000);
expect(mockSendReplayRequest).toHaveBeenCalledTimes(3);
expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT3]) });

// events array should be empty
expect(replay.eventBuffer?.pendingLength).toBe(0);
});

it("doesn't do anything, if a rate limit is hit and recording is already paused", async () => {
let paused = false;
expect(replay.session?.segmentId).toBe(0);
jest.spyOn(replay, 'isPaused').mockImplementation(() => {
return paused;
});
jest.spyOn(replay, 'pause');
jest.spyOn(replay, 'resume');
// @ts-ignore private API
jest.spyOn(replay, '_handleRateLimit');

const TEST_EVENT = { data: {}, timestamp: BASE_TIMESTAMP, type: 2 };

mockTransportSend.mockImplementationOnce(() => {
return Promise.resolve({ statusCode: 429 });
});

mockRecord._emitter(TEST_EVENT);
paused = true;

// T = base + 5
await advanceTimers(DEFAULT_FLUSH_MIN_DELAY);

expect(mockRecord.takeFullSnapshot).not.toHaveBeenCalled();
expect(mockSendReplayRequest).toHaveBeenCalledTimes(1);
expect(mockTransportSend).toHaveBeenCalledTimes(1);

expect(replay).toHaveLastSentReplay({ recordingData: JSON.stringify([TEST_EVENT]) });

expect(replay['_handleRateLimit']).toHaveBeenCalledTimes(1);
expect(replay.resume).not.toHaveBeenCalled();
expect(replay.isPaused).toHaveBeenCalledTimes(2);
expect(replay.pause).not.toHaveBeenCalled();
});
});

0 comments on commit d074416

Please sign in to comment.