Skip to content

Commit

Permalink
♻️ Harmonize batch/segment limit namings (#1532)
Browse files Browse the repository at this point in the history
  • Loading branch information
amortemousque authored May 16, 2022
1 parent 7a85025 commit 3a004b2
Show file tree
Hide file tree
Showing 23 changed files with 192 additions and 182 deletions.
8 changes: 4 additions & 4 deletions packages/core/src/domain/configuration/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ export interface Configuration extends TransportConfiguration {
// Batch configuration
batchBytesLimit: number
flushTimeout: number
maxBatchSize: number
maxMessageSize: number
batchMessagesLimit: number
messageBytesLimit: number
}

export function validateAndBuildConfiguration(initConfiguration: InitConfiguration): Configuration | undefined {
Expand Down Expand Up @@ -119,8 +119,8 @@ export function validateAndBuildConfiguration(initConfiguration: InitConfigurati
/**
* Logs intake limit
*/
maxBatchSize: 50,
maxMessageSize: 256 * ONE_KILO_BYTE,
batchMessagesLimit: 50,
messageBytesLimit: 256 * ONE_KILO_BYTE,
},
computeTransportConfiguration(initConfiguration)
)
Expand Down
34 changes: 19 additions & 15 deletions packages/core/src/transport/batch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Batch } from './batch'
import type { HttpRequest } from './httpRequest'

describe('batch', () => {
const MAX_SIZE = 3
const BATCH_MESSAGES_LIMIT = 3
const BATCH_BYTES_LIMIT = 100
const MESSAGE_BYTES_LIMIT = 50 * 1024
const FLUSH_TIMEOUT = 60 * 1000
Expand All @@ -15,7 +15,7 @@ describe('batch', () => {
beforeEach(() => {
transport = { send: noop } as unknown as HttpRequest
spyOn(transport, 'send')
batch = new Batch(transport, MAX_SIZE, BATCH_BYTES_LIMIT, MESSAGE_BYTES_LIMIT, FLUSH_TIMEOUT)
batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, MESSAGE_BYTES_LIMIT, FLUSH_TIMEOUT)
})

it('should add context to message', () => {
Expand All @@ -36,31 +36,35 @@ describe('batch', () => {
expect(transport.send).not.toHaveBeenCalled()
})

it('should calculate the byte size of message composed of 1 byte characters ', () => {
expect(batch.sizeInBytes('1234')).toEqual(4)
it('should count the bytes of a message composed of 1 byte characters', () => {
expect(batch.computeBytesCount('1234')).toEqual(4)
})

it('should calculate the byte size of message composed of multiple bytes characters ', () => {
expect(batch.sizeInBytes('🪐')).toEqual(4)
it('should count the bytes of a message composed of multiple bytes characters', () => {
expect(batch.computeBytesCount('🪐')).toEqual(4)
})

it('should flush when max size is reached', () => {
it('should flush when the message count limit is reached', () => {
batch.add({ message: '1' })
batch.add({ message: '2' })
batch.add({ message: '3' })
expect(transport.send).toHaveBeenCalledWith(
'{"message":"1"}\n{"message":"2"}\n{"message":"3"}',
jasmine.any(Number),
'max_messages_count'
'batch_messages_limit'
)
})

it('should flush when new message will overflow bytes limit', () => {
it('should flush when a new message will overflow the bytes limit', () => {
batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' })
expect(transport.send).not.toHaveBeenCalled()

batch.add({ message: '60 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' })
expect(transport.send).toHaveBeenCalledWith('{"message":"50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx"}', 50, 'max_size')
expect(transport.send).toHaveBeenCalledWith(
'{"message":"50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx"}',
50,
'batch_bytes_limit'
)

batch.flush()
expect(transport.send).toHaveBeenCalledWith(
Expand All @@ -70,7 +74,7 @@ describe('batch', () => {
)
})

it('should consider separator size when computing the size', () => {
it('should consider separators when computing the byte count', () => {
batch.add({ message: '30 bytes - xxxxx' }) // batch: 30 sep: 0
batch.add({ message: '30 bytes - xxxxx' }) // batch: 60 sep: 1
batch.add({ message: '39 bytes - xxxxxxxxxxxxxx' }) // batch: 99 sep: 2
Expand All @@ -82,7 +86,7 @@ describe('batch', () => {
)
})

it('should call send one time when the size is too high and the batch is empty', () => {
it('should call send one time when the byte count is too high and the batch is empty', () => {
const message = '101 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
batch.add({ message })
expect(transport.send).toHaveBeenCalledWith(`{"message":"${message}"}`, 101, jasmine.any(String))
Expand All @@ -98,7 +102,7 @@ describe('batch', () => {

it('should flush after timeout', () => {
const clock = sinon.useFakeTimers()
batch = new Batch(transport, MAX_SIZE, BATCH_BYTES_LIMIT, MESSAGE_BYTES_LIMIT, 10)
batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, MESSAGE_BYTES_LIMIT, 10)
batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' })
clock.tick(100)

Expand All @@ -107,9 +111,9 @@ describe('batch', () => {
clock.restore()
})

it('should not send a message with a size above the limit', () => {
it('should not send a message with a bytes size above the limit', () => {
const warnStub = sinon.stub(console, 'warn')
batch = new Batch(transport, MAX_SIZE, BATCH_BYTES_LIMIT, 50, FLUSH_TIMEOUT)
batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, 50, FLUSH_TIMEOUT)
batch.add({ message: '50 bytes - xxxxxxxxxxxxx' })

expect(transport.send).not.toHaveBeenCalled()
Expand Down
71 changes: 37 additions & 34 deletions packages/core/src/transport/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ const HAS_MULTI_BYTES_CHARACTERS = /[^\u0000-\u007F]/
export class Batch {
private pushOnlyBuffer: string[] = []
private upsertBuffer: { [key: string]: string } = {}
private bufferBytesSize = 0
private bufferMessageCount = 0
private bufferBytesCount = 0
private bufferMessagesCount = 0

constructor(
private request: HttpRequest,
private maxSize: number,
private bytesLimit: number,
private maxMessageSize: number,
private batchMessagesLimit: number,
private batchBytesLimit: number,
private messageBytesLimit: number,
private flushTimeout: number,
private beforeUnloadCallback: () => void = noop
) {
Expand All @@ -35,18 +35,18 @@ export class Batch {
}

flush(reason?: string) {
if (this.bufferMessageCount !== 0) {
if (this.bufferMessagesCount !== 0) {
const messages = this.pushOnlyBuffer.concat(objectValues(this.upsertBuffer))
this.request.send(messages.join('\n'), this.bufferBytesSize, reason)
this.request.send(messages.join('\n'), this.bufferBytesCount, reason)
this.pushOnlyBuffer = []
this.upsertBuffer = {}
this.bufferBytesSize = 0
this.bufferMessageCount = 0
this.bufferBytesCount = 0
this.bufferMessagesCount = 0
}
}

sizeInBytes(candidate: string) {
// Accurate byte size computations can degrade performances when there is a lot of events to process
computeBytesCount(candidate: string) {
// Accurate bytes count computations can degrade performances when there is a lot of events to process
if (!HAS_MULTI_BYTES_CHARACTERS.test(candidate)) {
return candidate.length
}
Expand All @@ -59,71 +59,74 @@ export class Batch {
}

private addOrUpdate(message: Context, key?: string) {
const { processedMessage, messageBytesSize } = this.process(message)
if (messageBytesSize >= this.maxMessageSize) {
display.warn(`Discarded a message whose size was bigger than the maximum allowed size ${this.maxMessageSize}KB.`)
const { processedMessage, messageBytesCount } = this.process(message)
if (messageBytesCount >= this.messageBytesLimit) {
display.warn(
`Discarded a message whose size was bigger than the maximum allowed size ${this.messageBytesLimit}KB.`
)
return
}
if (this.hasMessageFor(key)) {
this.remove(key)
}
if (this.willReachedBytesLimitWith(messageBytesSize)) {
this.flush('max_size')
if (this.willReachedBytesLimitWith(messageBytesCount)) {
this.flush('batch_bytes_limit')
}
this.push(processedMessage, messageBytesSize, key)

this.push(processedMessage, messageBytesCount, key)
if (this.isFull()) {
this.flush('max_messages_count')
this.flush('batch_messages_limit')
}
}

private process(message: Context) {
const processedMessage = jsonStringify(message)!
const messageBytesSize = this.sizeInBytes(processedMessage)
return { processedMessage, messageBytesSize }
const messageBytesCount = this.computeBytesCount(processedMessage)
return { processedMessage, messageBytesCount }
}

private push(processedMessage: string, messageBytesSize: number, key?: string) {
if (this.bufferMessageCount > 0) {
private push(processedMessage: string, messageBytesCount: number, key?: string) {
if (this.bufferMessagesCount > 0) {
// \n separator at serialization
this.bufferBytesSize += 1
this.bufferBytesCount += 1
}
if (key !== undefined) {
this.upsertBuffer[key] = processedMessage
} else {
this.pushOnlyBuffer.push(processedMessage)
}
this.bufferBytesSize += messageBytesSize
this.bufferMessageCount += 1
this.bufferBytesCount += messageBytesCount
this.bufferMessagesCount += 1
}

private remove(key: string) {
const removedMessage = this.upsertBuffer[key]
delete this.upsertBuffer[key]
const messageBytesSize = this.sizeInBytes(removedMessage)
this.bufferBytesSize -= messageBytesSize
this.bufferMessageCount -= 1
if (this.bufferMessageCount > 0) {
this.bufferBytesSize -= 1
const messageBytesCount = this.computeBytesCount(removedMessage)
this.bufferBytesCount -= messageBytesCount
this.bufferMessagesCount -= 1
if (this.bufferMessagesCount > 0) {
this.bufferBytesCount -= 1
}
}

private hasMessageFor(key?: string): key is string {
return key !== undefined && this.upsertBuffer[key] !== undefined
}

private willReachedBytesLimitWith(messageBytesSize: number) {
private willReachedBytesLimitWith(messageBytesCount: number) {
// byte of the separator at the end of the message
return this.bufferBytesSize + messageBytesSize + 1 >= this.bytesLimit
return this.bufferBytesCount + messageBytesCount + 1 >= this.batchBytesLimit
}

private isFull() {
return this.bufferMessageCount === this.maxSize || this.bufferBytesSize >= this.bytesLimit
return this.bufferMessagesCount === this.batchMessagesLimit || this.bufferBytesCount >= this.batchBytesLimit
}

private flushPeriodically() {
setTimeout(
monitor(() => {
this.flush('max_duration')
this.flush('batch_flush_timeout')
this.flushPeriodically()
}),
this.flushTimeout
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/transport/httpRequest.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ describe('httpRequest', () => {
expect(server.requests[0].requestBody).toEqual('{"foo":"bar1"}\n{"foo":"bar2"}')
})

it('should use sendBeacon when the size is correct', () => {
it('should use sendBeacon when the bytes count is correct', () => {
spyOn(navigator, 'sendBeacon').and.callFake(() => true)

request.send('{"foo":"bar1"}\n{"foo":"bar2"}', 10)

expect(navigator.sendBeacon).toHaveBeenCalled()
})

it('should use xhr over sendBeacon when the size too high', () => {
it('should use xhr over sendBeacon when the bytes count is too high', () => {
spyOn(navigator, 'sendBeacon').and.callFake(() => true)

request.send('{"foo":"bar1"}\n{"foo":"bar2"}', BATCH_BYTES_LIMIT)
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/transport/httpRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import { addFailedSendBeacon } from './failedSendBeacon'
export class HttpRequest {
constructor(private endpointBuilder: EndpointBuilder, private bytesLimit: number) {}

send(data: string | FormData, size: number, reason?: string) {
send(data: string | FormData, bytesCount: number, reason?: string) {
const url = this.endpointBuilder.build()
const canUseBeacon = !!navigator.sendBeacon && size < this.bytesLimit
const canUseBeacon = !!navigator.sendBeacon && bytesCount < this.bytesLimit
if (canUseBeacon) {
try {
const isQueued = navigator.sendBeacon(url, data)
if (isQueued) {
return
}

addFailedSendBeacon(this.endpointBuilder.endpointType, size, reason)
addFailedSendBeacon(this.endpointBuilder.endpointType, bytesCount, reason)
} catch (e) {
reportBeaconError(e)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/transport/startBatchWithReplica.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ export function startBatchWithReplica<T extends Context>(
function createBatch(endpointBuilder: EndpointBuilder) {
return new Batch(
new HttpRequest(endpointBuilder, configuration.batchBytesLimit),
configuration.maxBatchSize,
configuration.batchMessagesLimit,
configuration.batchBytesLimit,
configuration.maxMessageSize,
configuration.messageBytesLimit,
configuration.flushTimeout
)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/logs/src/boot/startLogs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('logs', () => {
baseConfiguration = {
...validateAndBuildLogsConfiguration(initConfiguration)!,
logsEndpointBuilder: stubEndpointBuilder('https://localhost/v1/input/log'),
maxBatchSize: 1,
batchMessagesLimit: 1,
}
logger = new Logger((...params) => handleLog(...params))
server = sinon.fakeServer.create()
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('logs', () => {
})

it('should all use the same batch', () => {
;({ handleLog } = startLogs({ ...baseConfiguration, maxBatchSize: 3 }, () => COMMON_CONTEXT, logger))
;({ handleLog } = startLogs({ ...baseConfiguration, batchMessagesLimit: 3 }, () => COMMON_CONTEXT, logger))

handleLog(DEFAULT_MESSAGE, logger)
handleLog(DEFAULT_MESSAGE, logger)
Expand Down
4 changes: 2 additions & 2 deletions packages/rum-core/src/domain/requestCollection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('collect fetch', () => {
configuration = {
...validateAndBuildRumConfiguration({ clientToken: 'xxx', applicationId: 'xxx' })!,
...SPEC_ENDPOINTS,
maxBatchSize: 1,
batchMessagesLimit: 1,
}
fetchStubManager = stubFetch()

Expand Down Expand Up @@ -151,7 +151,7 @@ describe('collect xhr', () => {
configuration = {
...validateAndBuildRumConfiguration({ clientToken: 'xxx', applicationId: 'xxx' })!,
...SPEC_ENDPOINTS,
maxBatchSize: 1,
batchMessagesLimit: 1,
}
stubXhrManager = stubXhr()
startSpy = jasmine.createSpy('requestStart')
Expand Down
4 changes: 2 additions & 2 deletions packages/rum-core/src/transport/startRumBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ function makeRumBatch(configuration: RumConfiguration, lifeCycle: LifeCycle): Ru
function createRumBatch(endpointBuilder: EndpointBuilder, unloadCallback?: () => void) {
return new Batch(
new HttpRequest(endpointBuilder, configuration.batchBytesLimit),
configuration.maxBatchSize,
configuration.batchMessagesLimit,
configuration.batchBytesLimit,
configuration.maxMessageSize,
configuration.messageBytesLimit,
configuration.flushTimeout,
unloadCallback
)
Expand Down
Loading

0 comments on commit 3a004b2

Please sign in to comment.