From d245403dbffd5ebb86721f1ceb9dc654662bbbfd Mon Sep 17 00:00:00 2001 From: Bastien Caudan Date: Fri, 5 Feb 2021 11:18:17 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=97=20[RUMF-823]=20monitor=20deflate=20wo?= =?UTF-8?q?rker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/src/domain/internalMonitoring.ts | 2 +- packages/core/src/index.ts | 1 + .../src/domain/deflateSegmentWriter.ts | 45 ++++++------ .../src/domain/deflateWorker.d.ts | 1 + .../rum-recorder/src/domain/deflateWorker.js | 68 ++++++++++++------- 5 files changed, 72 insertions(+), 45 deletions(-) diff --git a/packages/core/src/domain/internalMonitoring.ts b/packages/core/src/domain/internalMonitoring.ts index 92c9ae0562..2fcee3541c 100644 --- a/packages/core/src/domain/internalMonitoring.ts +++ b/packages/core/src/domain/internalMonitoring.ts @@ -132,7 +132,7 @@ export function addMonitoringMessage(message: string, context?: Context) { }) } -function addErrorToMonitoringBatch(e: unknown) { +export function addErrorToMonitoringBatch(e: unknown) { addToMonitoringBatch({ ...formatError(e), status: StatusType.error, diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 1d7fe5f3c2..d45f6f03da 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -16,6 +16,7 @@ export { monitored, monitor, addMonitoringMessage, + addErrorToMonitoringBatch, setDebugMode, } from './domain/internalMonitoring' export { Observable } from './tools/observable' diff --git a/packages/rum-recorder/src/domain/deflateSegmentWriter.ts b/packages/rum-recorder/src/domain/deflateSegmentWriter.ts index bfd5b9f36b..846fa3449d 100644 --- a/packages/rum-recorder/src/domain/deflateSegmentWriter.ts +++ b/packages/rum-recorder/src/domain/deflateSegmentWriter.ts @@ -1,4 +1,4 @@ -import { addMonitoringMessage } from '@datadog/browser-core' +import { addMonitoringMessage, addErrorToMonitoringBatch, monitor } from '@datadog/browser-core' import { SegmentMeta } from '../types' import { DeflateWorker } from './deflateWorker' import { SegmentWriter } from './segment' @@ -12,28 +12,33 @@ export class DeflateSegmentWriter implements SegmentWriter { private onWrote: (size: number) => void, private onFlushed: (data: Uint8Array, meta: SegmentMeta) => void ) { - worker.addEventListener('message', ({ data }) => { - if ('result' in data) { - let pendingMeta = this.pendingMeta.shift()! + worker.addEventListener( + 'message', + monitor(({ data }) => { + if ('error' in data) { + addErrorToMonitoringBatch(data.error) + } else if ('result' in data) { + let pendingMeta = this.pendingMeta.shift()! - // Messages should be received in the same order as they are sent, so the first - // 'pendingMeta' of the list should be the one corresponding to the handled message. - // But if something goes wrong in the worker and a response is lost, we need to avoid - // associating an incorrect meta to the flushed segment. Remove any pending meta with an id - // inferior to the one being waited for. - if (pendingMeta.id !== data.id) { - let lostCount = 0 - while (pendingMeta.id !== data.id) { - pendingMeta = this.pendingMeta.shift()! - lostCount += 1 + // Messages should be received in the same order as they are sent, so the first + // 'pendingMeta' of the list should be the one corresponding to the handled message. + // But if something goes wrong in the worker and a response is lost, we need to avoid + // associating an incorrect meta to the flushed segment. Remove any pending meta with an id + // inferior to the one being waited for. + if (pendingMeta.id !== data.id) { + let lostCount = 0 + while (pendingMeta.id !== data.id) { + pendingMeta = this.pendingMeta.shift()! + lostCount += 1 + } + addMonitoringMessage(`${lostCount} deflate worker responses have been lost`) } - addMonitoringMessage(`${lostCount} deflate worker responses have been lost`) + this.onFlushed(data.result, pendingMeta.meta) + } else { + this.onWrote(data.size) } - this.onFlushed(data.result, pendingMeta.meta) - } else { - this.onWrote(data.size) - } - }) + }) + ) } write(data: string): void { diff --git a/packages/rum-recorder/src/domain/deflateWorker.d.ts b/packages/rum-recorder/src/domain/deflateWorker.d.ts index f08d989910..319b9e83e4 100644 --- a/packages/rum-recorder/src/domain/deflateWorker.d.ts +++ b/packages/rum-recorder/src/domain/deflateWorker.d.ts @@ -30,3 +30,4 @@ export type DeflateWorkerResponse = id: number result: Uint8Array } + | { error: Error | string } diff --git a/packages/rum-recorder/src/domain/deflateWorker.js b/packages/rum-recorder/src/domain/deflateWorker.js index 047992bff5..3a41e66148 100644 --- a/packages/rum-recorder/src/domain/deflateWorker.js +++ b/packages/rum-recorder/src/domain/deflateWorker.js @@ -10,32 +10,52 @@ export function createDeflateWorker() { } function workerCodeFn() { - const { Deflate, constants } = makePakoDeflate() - - let deflate = new Deflate() - self.addEventListener('message', (event) => { - const data = event.data - switch (data.action) { - case 'write': - deflate.push(data.data, constants.Z_SYNC_FLUSH) - self.postMessage({ - id: data.id, - size: deflate.chunks.reduce((total, chunk) => total + chunk.length, 0), - }) - break - case 'flush': - if (data.data) { - deflate.push(data.data, constants.Z_SYNC_FLUSH) + monitor(function () { + const { Deflate, constants } = makePakoDeflate() + + let deflate = new Deflate() + self.addEventListener( + 'message', + monitor((event) => { + const data = event.data + switch (data.action) { + case 'write': + deflate.push(data.data, constants.Z_SYNC_FLUSH) + self.postMessage({ + id: data.id, + size: deflate.chunks.reduce((total, chunk) => total + chunk.length, 0), + }) + break + case 'flush': + if (data.data) { + deflate.push(data.data, constants.Z_SYNC_FLUSH) + } + deflate.push('', constants.Z_FINISH) + self.postMessage({ + id: data.id, + result: deflate.result, + }) + deflate = new Deflate() + break } - deflate.push('', constants.Z_FINISH) - self.postMessage({ - id: data.id, - result: deflate.result, - }) - deflate = new Deflate() - break + }) + ) + })() + + function monitor(fn) { + return function () { + try { + return fn.apply(this, arguments) + } catch (e) { + try { + self.postMessage({ error: e }) + } catch (_) { + // DATA_CLONE_ERR, cf https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm + self.postMessage({ error: '' + e }) + } + } } - }) + } // https://github.com/nodeca/pako/blob/034669ba0f1a4c0590e45f7c2820128200f972b3/dist/pako_deflate.es5.js function makePakoDeflate() {