Skip to content

Commit

Permalink
⚡️[RUMF-1111] limit the number of bytes read from the response (#1264)
Browse files Browse the repository at this point in the history
* ⚡️[RUMF-1111] limit the number of bytes read from the response

* [RUMF-1111] add support for Edge 18 and lower

* 👌 use a more explicit `truncateResponseStream` function

* 👌 rename partialBuffers to chunks

* 👌 no need to slice chunk

* 👌 encapsulate the "exceeded limit" logic in `readBytes`

* 👌 rephrase comment a bit

* 👌 add a test on cancel returning a rejected promise

* 👌 exclicitly states that it is an optimization

Co-authored-by: Bastien Caudan <bastien.caudan@datadoghq.com>

* 👌 add a E2E test

* move the limit to logs

* 👌 make sure the amount of bytes sent is close to the limit

* 👌 remove assertion with server-side limit

* 👌 move comment

* ✅ increase limit

The limit might not be enough in BS. It might depend on the network
latency and cause flakiness... we'll see

Co-authored-by: Bastien Caudan <bastien.caudan@datadoghq.com>
  • Loading branch information
BenoitZugmeyer and bcaudan authored Jan 17, 2022
1 parent 4ca0ed8 commit 486faa2
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 47 deletions.
6 changes: 0 additions & 6 deletions packages/core/src/domain/configuration/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export interface Configuration extends TransportConfiguration {
// Event limits
eventRateLimiterThreshold: number // Limit the maximum number of actions, errors and logs per minutes
maxInternalMonitoringMessagesPerPage: number
requestErrorResponseLengthLimit: number

// Batch configuration
batchBytesLimit: number
Expand Down Expand Up @@ -108,11 +107,6 @@ export function validateAndBuildConfiguration(
eventRateLimiterThreshold: 3000,
maxInternalMonitoringMessagesPerPage: 15,

/**
* arbitrary value, byte precision not needed
*/
requestErrorResponseLengthLimit: 32 * ONE_KILO_BYTE,

/**
* flush automatically, aim to be lower than ALB connection timeout
* to maximize connection reuse.
Expand Down
42 changes: 25 additions & 17 deletions packages/core/test/specHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,33 @@ export interface ResponseStubOptions {
type?: ResponseType
responseText?: string
responseTextError?: Error
body?: ReadableStream<Uint8Array>
}
function notYetImplemented(): never {
throw new Error('not yet implemented')
}

export class ResponseStub implements Response {
private _bodyUsed = false

constructor(private options: Readonly<ResponseStubOptions>) {}
private _body: ReadableStream<Uint8Array> | undefined

constructor(private options: Readonly<ResponseStubOptions>) {
if (this.options.body) {
this._body = this.options.body
} else if (this.options.responseTextError !== undefined) {
this._body = new ReadableStream({
start: (controller) => {
controller.error(this.options.responseTextError)
},
})
} else if (this.options.responseText !== undefined) {
this._body = new ReadableStream({
start: (controller) => {
controller.enqueue(new TextEncoder().encode(this.options.responseText))
controller.close()
},
})
}
}

get status() {
return this.options.status ?? 200
Expand All @@ -167,19 +185,11 @@ export class ResponseStub implements Response {
}

get bodyUsed() {
return this._bodyUsed
return this._body ? this._body.locked : false
}

text() {
if (this.bodyUsed) {
return Promise.reject(new TypeError("Failed to execute 'text' on 'Response': body stream already read"))
}
this._bodyUsed = true
if (this.options.responseTextError !== undefined) {
return Promise.reject(this.options.responseTextError)
}

return Promise.resolve(this.options.responseText ?? '')
get body() {
return this._body || null
}

clone() {
Expand All @@ -192,6 +202,7 @@ export class ResponseStub implements Response {
// Partial implementation, feel free to implement
/* eslint-disable @typescript-eslint/member-ordering */
arrayBuffer = notYetImplemented
text = notYetImplemented
blob = notYetImplemented
formData = notYetImplemented
json = notYetImplemented
Expand All @@ -214,9 +225,6 @@ export class ResponseStub implements Response {
get url() {
return notYetImplemented()
}
get body() {
return notYetImplemented()
}
}

export type FetchStub = (input: RequestInfo, init?: RequestInit) => FetchStubPromise
Expand Down
10 changes: 9 additions & 1 deletion packages/logs/src/domain/configuration.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Configuration, InitConfiguration } from '@datadog/browser-core'
import { validateAndBuildConfiguration } from '@datadog/browser-core'
import { ONE_KILO_BYTE, validateAndBuildConfiguration } from '@datadog/browser-core'
import { buildEnv } from '../boot/buildEnv'
import type { LogsEvent } from '../logsEvent.types'

Expand All @@ -12,8 +12,14 @@ export type HybridInitConfiguration = Omit<LogsInitConfiguration, 'clientToken'>

export interface LogsConfiguration extends Configuration {
forwardErrorsToLogs: boolean
requestErrorResponseLengthLimit: number
}

/**
* arbitrary value, byte precision not needed
*/
export const DEFAULT_REQUEST_ERROR_RESPONSE_LENGTH_LIMIT = 32 * ONE_KILO_BYTE

export function validateAndBuildLogsConfiguration(
initConfiguration: LogsInitConfiguration
): LogsConfiguration | undefined {
Expand All @@ -26,5 +32,7 @@ export function validateAndBuildLogsConfiguration(
...baseConfiguration,

forwardErrorsToLogs: !!initConfiguration.forwardErrorsToLogs,

requestErrorResponseLengthLimit: DEFAULT_REQUEST_ERROR_RESPONSE_LENGTH_LIMIT,
}
}
72 changes: 67 additions & 5 deletions packages/logs/src/domain/trackNetworkError.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,19 @@ describe('computeXhrResponseData', () => {
})

describe('computeFetchResponseText', () => {
let onunhandledrejectionSpy: jasmine.Spy

beforeEach(() => {
if (isIE()) {
pending('IE does not support the fetch API')
}

onunhandledrejectionSpy = jasmine.createSpy()
window.onunhandledrejection = onunhandledrejectionSpy
})

afterEach(() => {
window.onunhandledrejection = null
})

it('computes response text from Response objects', (done) => {
Expand Down Expand Up @@ -185,16 +194,69 @@ describe('computeFetchResponseText', () => {
})
})

it('truncates fetch response text', (done) => {
it('reads a limited amount of bytes from the response', (done) => {
// Creates a response that stream "f" indefinitely, one byte at a time
const cancelSpy = jasmine.createSpy()
const pullSpy = jasmine.createSpy().and.callFake((controller: ReadableStreamDefaultController<Uint8Array>) => {
controller.enqueue(new TextEncoder().encode('f'))
})
const response = new ResponseStub({
body: new ReadableStream({
pull: pullSpy,
cancel: cancelSpy,
}),
})

computeFetchResponseText(response, CONFIGURATION, () => {
expect(pullSpy).toHaveBeenCalledTimes(
// readLimitedAmountOfBytes may read one more byte than necessary to make sure it exceeds the limit
CONFIGURATION.requestErrorResponseLengthLimit + 1
)
expect(cancelSpy).toHaveBeenCalledTimes(1)
done()
})
})

it('truncates the response if its size is greater than the limit', (done) => {
const text = 'foobar'
computeFetchResponseText(
new ResponseStub({ responseText: 'Lorem ipsum dolor sit amet orci aliquam.' }),
{ ...CONFIGURATION, requestErrorResponseLengthLimit: 32 },
(responseText) => {
expect(responseText).toBe('Lorem ipsum dolor sit amet orci ...')
new ResponseStub({ responseText: text }),
{ ...CONFIGURATION, requestErrorResponseLengthLimit: text.length - 1 },
(responseData) => {
expect(responseData).toBe('fooba...')
done()
}
)
})

it('does not truncate the response if its size is equal to the limit', (done) => {
const text = 'foo'
computeFetchResponseText(
new ResponseStub({ responseText: text }),
{ ...CONFIGURATION, requestErrorResponseLengthLimit: text.length },
(responseData) => {
expect(responseData).toBe(text)
done()
}
)
})

it('does not yield an unhandled rejection error if the cancel promise is rejected', (done) => {
// Creates a response that stream "f" indefinitely and fails to be canceled
const response = new ResponseStub({
body: new ReadableStream({
pull: (controller) => controller.enqueue(new TextEncoder().encode('f')),
cancel: () => Promise.reject(new Error('foo')),
}),
})

computeFetchResponseText(response, CONFIGURATION, () => {
setTimeout(() => {
expect(onunhandledrejectionSpy).not.toHaveBeenCalled()
done()
})
})
})
})

describe('computeFetchErrorText', () => {
Expand Down
137 changes: 131 additions & 6 deletions packages/logs/src/domain/trackNetworkError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
computeStackTrace,
toStackTraceString,
monitor,
noop,
} from '@datadog/browser-core'
import type { LogsConfiguration } from './configuration'

Expand Down Expand Up @@ -86,13 +87,56 @@ export function computeFetchResponseText(
configuration: LogsConfiguration,
callback: (responseText?: string) => void
) {
response
.clone()
.text()
.then(
monitor((text) => callback(truncateResponseText(text, configuration))),
monitor((error) => callback(`Unable to retrieve response: ${error as string}`))
if (!window.TextDecoder) {
// If the browser doesn't support TextDecoder, let's read the whole response then truncate it.
//
// This should only be the case on early versions of Edge (before they migrated to Chromium).
// Even if it could be possible to implement a workaround for the missing TextDecoder API (using
// a Blob and FileReader), we found another issue preventing us from reading only the first
// bytes from the response: contrary to other browsers, when reading from the cloned response,
// if the original response gets canceled, the cloned response is also canceled and we can't
// know about it. In the following illustration, the promise returned by `reader.read()` may
// never be fulfilled:
//
// fetch('/').then((response) => {
// const reader = response.clone().body.getReader()
// readMore()
// function readMore() {
// reader.read().then(
// (result) => {
// if (result.done) {
// console.log('done')
// } else {
// readMore()
// }
// },
// () => console.log('error')
// )
// }
// response.body.getReader().cancel()
// })
response
.clone()
.text()
.then(
monitor((text) => callback(truncateResponseText(text, configuration))),
monitor((error) => callback(`Unable to retrieve response: ${error as string}`))
)
} else if (!response.body) {
callback()
} else {
truncateResponseStream(
response.clone().body!,
configuration.requestErrorResponseLengthLimit,
(error, responseText) => {
if (error) {
callback(`Unable to retrieve response: ${(error as unknown) as string}`)
} else {
callback(responseText)
}
}
)
}
}

function isRejected(request: { status: number; responseType?: string }) {
Expand All @@ -116,3 +160,84 @@ function format(type: RequestType) {
}
return 'Fetch'
}

function truncateResponseStream(
stream: ReadableStream<Uint8Array>,
limit: number,
callback: (error?: Error, responseText?: string) => void
) {
readLimitedAmountOfBytes(stream, limit, (error, bytes, limitExceeded) => {
if (error) {
callback(error)
} else {
let responseText = new TextDecoder().decode(bytes)
if (limitExceeded) {
responseText += '...'
}
callback(undefined, responseText)
}
})
}

/**
* Read bytes from a ReadableStream until at least `limit` bytes have been read (or until the end of
* the stream). The callback is invoked with the at most `limit` bytes, and indicates that the limit
* has been exceeded if more bytes were available.
*/
function readLimitedAmountOfBytes(
stream: ReadableStream<Uint8Array>,
limit: number,
callback: (error?: Error, bytes?: Uint8Array, limitExceeded?: boolean) => void
) {
const reader = stream.getReader()
const chunks: Uint8Array[] = []
let readBytesCount = 0

readMore()

function readMore() {
reader.read().then(
monitor((result: ReadableStreamReadResult<Uint8Array>) => {
if (result.done) {
onDone()
return
}

chunks.push(result.value)
readBytesCount += result.value.length

if (readBytesCount > limit) {
onDone()
} else {
readMore()
}
}),
monitor((error) => callback(error))
)
}

function onDone() {
reader.cancel().catch(
// we don't care if cancel fails, but we still need to catch the error to avoid reporting it
// as an unhandled rejection
noop
)

let completeBuffer: Uint8Array
if (chunks.length === 1) {
// optim: if the response is small enough to fit in a single buffer (provided by the browser), just
// use it directly.
completeBuffer = chunks[0]
} else {
// else, we need to copy buffers into a larger buffer to concatenate them.
completeBuffer = new Uint8Array(readBytesCount)
let offset = 0
chunks.forEach((chunk) => {
completeBuffer.set(chunk, offset)
offset += chunk.length
})
}

callback(undefined, completeBuffer.slice(0, limit), completeBuffer.length > limit)
}
}
2 changes: 2 additions & 0 deletions test/e2e/lib/framework/createTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ interface TestContext {
crossOriginUrl: string
serverEvents: EventRegistry
bridgeEvents: EventRegistry
servers: Servers
}

type TestRunner = (testContext: TestContext) => Promise<void>
Expand Down Expand Up @@ -178,6 +179,7 @@ function createTestContext(servers: Servers): TestContext {
crossOriginUrl: servers.crossOrigin.url,
serverEvents: new EventRegistry(),
bridgeEvents: new EventRegistry(),
servers,
}
}

Expand Down
Loading

0 comments on commit 486faa2

Please sign in to comment.