From e5a65b5c18bbc15c87ba98c1d84a448661f6b913 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 25 Sep 2025 16:22:23 +0200 Subject: [PATCH 1/6] ref(core): Improve promise buffer --- packages/core/src/utils/promisebuffer.ts | 87 ++++++++--------- .../core/test/lib/utils/promisebuffer.test.ts | 96 +++++++++++++------ 2 files changed, 111 insertions(+), 72 deletions(-) diff --git a/packages/core/src/utils/promisebuffer.ts b/packages/core/src/utils/promisebuffer.ts index 2830e8897129..8f849585c51b 100644 --- a/packages/core/src/utils/promisebuffer.ts +++ b/packages/core/src/utils/promisebuffer.ts @@ -1,9 +1,9 @@ -import { rejectedSyncPromise, resolvedSyncPromise, SyncPromise } from './syncpromise'; +import { rejectedSyncPromise, resolvedSyncPromise } from './syncpromise'; export interface PromiseBuffer { // exposes the internal array so tests can assert on the state of it. // XXX: this really should not be public api. - $: Array>; + $: PromiseLike[]; add(taskProducer: () => PromiseLike): PromiseLike; drain(timeout?: number): PromiseLike; } @@ -14,11 +14,11 @@ export const SENTRY_BUFFER_FULL_ERROR = Symbol.for('SentryBufferFullError'); * Creates an new PromiseBuffer object with the specified limit * @param limit max number of promises that can be stored in the buffer */ -export function makePromiseBuffer(limit?: number): PromiseBuffer { - const buffer: Array> = []; +export function makePromiseBuffer(limit: number = 100): PromiseBuffer { + const buffer: Set> = new Set(); function isReady(): boolean { - return limit === undefined || buffer.length < limit; + return buffer.size < limit; } /** @@ -27,8 +27,8 @@ export function makePromiseBuffer(limit?: number): PromiseBuffer { * @param task Can be any PromiseLike * @returns Removed promise. */ - function remove(task: PromiseLike): PromiseLike { - return buffer.splice(buffer.indexOf(task), 1)[0] || Promise.resolve(undefined); + function remove(task: PromiseLike): void { + buffer.delete(task); } /** @@ -48,22 +48,26 @@ export function makePromiseBuffer(limit?: number): PromiseBuffer { // start the task and add its promise to the queue const task = taskProducer(); - if (buffer.indexOf(task) === -1) { - buffer.push(task); - } - void task - .then(() => remove(task)) - // Use `then(null, rejectionHandler)` rather than `catch(rejectionHandler)` so that we can use `PromiseLike` - // rather than `Promise`. `PromiseLike` doesn't have a `.catch` method, making its polyfill smaller. (ES5 didn't - // have promises, so TS has to polyfill when down-compiling.) - .then(null, () => - remove(task).then(null, () => { - // We have to add another catch here because `remove()` starts a new promise chain. - }), - ); + buffer.add(task); + void task.then( + () => remove(task), + () => remove(task), + ); return task; } + function drainNextSyncPromise(): PromiseLike { + const item = buffer.values().next().value; + + if (!item) { + return resolvedSyncPromise(true); + } + + return resolvedSyncPromise(item).then(() => { + return drainNextSyncPromise(); + }); + } + /** * Wait for all promises in the queue to resolve or for timeout to expire, whichever comes first. * @@ -74,34 +78,27 @@ export function makePromiseBuffer(limit?: number): PromiseBuffer { * `false` otherwise */ function drain(timeout?: number): PromiseLike { - return new SyncPromise((resolve, reject) => { - let counter = buffer.length; - - if (!counter) { - return resolve(true); - } - - // wait for `timeout` ms and then resolve to `false` (if not cancelled first) - const capturedSetTimeout = setTimeout(() => { - if (timeout && timeout > 0) { - resolve(false); - } - }, timeout); - - // if all promises resolve in time, cancel the timer and resolve to `true` - buffer.forEach(item => { - void resolvedSyncPromise(item).then(() => { - if (!--counter) { - clearTimeout(capturedSetTimeout); - resolve(true); - } - }, reject); - }); - }); + if (!buffer.size) { + return resolvedSyncPromise(true); + } + + const drainPromise = drainNextSyncPromise(); + + if (!timeout) { + return drainPromise; + } + + const promises = [drainPromise, new Promise(resolve => setTimeout(() => resolve(false), timeout))]; + + // Promise.race will resolve to the first promise that resolves or rejects + // So if the drainPromise resolves, the timeout promise will be ignored + return Promise.race(promises); } return { - $: buffer, + get $(): PromiseLike[] { + return Array.from(buffer); + }, add, drain, }; diff --git a/packages/core/test/lib/utils/promisebuffer.test.ts b/packages/core/test/lib/utils/promisebuffer.test.ts index 618de06322a0..5390a32001f9 100644 --- a/packages/core/test/lib/utils/promisebuffer.test.ts +++ b/packages/core/test/lib/utils/promisebuffer.test.ts @@ -1,24 +1,33 @@ import { describe, expect, test, vi } from 'vitest'; import { makePromiseBuffer } from '../../../src/utils/promisebuffer'; -import { SyncPromise } from '../../../src/utils/syncpromise'; +import { rejectedSyncPromise, resolvedSyncPromise } from '../../../src/utils/syncpromise'; describe('PromiseBuffer', () => { describe('add()', () => { - test('no limit', () => { - const buffer = makePromiseBuffer(); - const p = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve))); - void buffer.add(p); - expect(buffer.$.length).toEqual(1); + test('sync promises', () => { + const buffer = makePromiseBuffer(1); + let task1; + const producer1 = vi.fn(() => { + task1 = resolvedSyncPromise(); + return task1; + }); + const producer2 = vi.fn(() => resolvedSyncPromise()); + expect(buffer.add(producer1)).toEqual(task1); + void expect(buffer.add(producer2)).rejects.toThrowError(); + // This is immediately executed and removed again from the buffer + expect(buffer.$.length).toEqual(0); + expect(producer1).toHaveBeenCalled(); + expect(producer2).toHaveBeenCalled(); }); - test('with limit', () => { + test('async promises', () => { const buffer = makePromiseBuffer(1); let task1; const producer1 = vi.fn(() => { - task1 = new SyncPromise(resolve => setTimeout(resolve)); + task1 = new Promise(resolve => setTimeout(resolve, 1)); return task1; }); - const producer2 = vi.fn(() => new SyncPromise(resolve => setTimeout(resolve))); + const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); expect(buffer.add(producer1)).toEqual(task1); void expect(buffer.add(producer2)).rejects.toThrowError(); expect(buffer.$.length).toEqual(1); @@ -28,25 +37,60 @@ describe('PromiseBuffer', () => { }); describe('drain()', () => { - test('without timeout', async () => { + test('drains all promises without timeout', async () => { const buffer = makePromiseBuffer(); - for (let i = 0; i < 5; i++) { - void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve))); - } + + const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + + [p1, p2, p3, p4, p5].forEach(p => { + void buffer.add(p); + }); + expect(buffer.$.length).toEqual(5); const result = await buffer.drain(); expect(result).toEqual(true); expect(buffer.$.length).toEqual(0); + + expect(p1).toHaveBeenCalled(); + expect(p2).toHaveBeenCalled(); + expect(p3).toHaveBeenCalled(); + expect(p4).toHaveBeenCalled(); + expect(p5).toHaveBeenCalled(); }); - test('with timeout', async () => { + test('drains all promises with timeout xxx', async () => { const buffer = makePromiseBuffer(); - for (let i = 0; i < 5; i++) { - void buffer.add(() => new SyncPromise(resolve => setTimeout(resolve, 100))); - } + + const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 2))); + const p2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 4))); + const p3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 6))); + const p4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 8))); + const p5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 10))); + + [p1, p2, p3, p4, p5].forEach(p => { + void buffer.add(p); + }); + + expect(p1).toHaveBeenCalled(); + expect(p2).toHaveBeenCalled(); + expect(p3).toHaveBeenCalled(); + expect(p4).toHaveBeenCalled(); + expect(p5).toHaveBeenCalled(); + expect(buffer.$.length).toEqual(5); - const result = await buffer.drain(50); + const result = await buffer.drain(8); expect(result).toEqual(false); + // p5 is still in the buffer + expect(buffer.$.length).toEqual(1); + + // Now drain final item + const result2 = await buffer.drain(); + expect(result2).toEqual(true); + expect(buffer.$.length).toEqual(0); }); test('on empty buffer', async () => { @@ -60,7 +104,7 @@ describe('PromiseBuffer', () => { test('resolved promises should not show up in buffer length', async () => { const buffer = makePromiseBuffer(); - const producer = () => new SyncPromise(resolve => setTimeout(resolve)); + const producer = () => new Promise(resolve => setTimeout(resolve, 1)); const task = buffer.add(producer); expect(buffer.$.length).toEqual(1); await task; @@ -69,20 +113,18 @@ describe('PromiseBuffer', () => { test('rejected promises should not show up in buffer length', async () => { const buffer = makePromiseBuffer(); - const producer = () => new SyncPromise((_, reject) => setTimeout(reject)); + const error = new Error('whoops'); + const producer = () => new Promise((_, reject) => setTimeout(() => reject(error), 1)); const task = buffer.add(producer); expect(buffer.$.length).toEqual(1); - try { - await task; - } catch { - // no-empty - } + + await expect(task).rejects.toThrow(error); expect(buffer.$.length).toEqual(0); }); test('resolved task should give an access to the return value', async () => { const buffer = makePromiseBuffer(); - const producer = () => new SyncPromise(resolve => setTimeout(() => resolve('test'))); + const producer = () => resolvedSyncPromise('test'); const task = buffer.add(producer); const result = await task; expect(result).toEqual('test'); @@ -91,7 +133,7 @@ describe('PromiseBuffer', () => { test('rejected task should give an access to the return value', async () => { expect.assertions(1); const buffer = makePromiseBuffer(); - const producer = () => new SyncPromise((_, reject) => setTimeout(() => reject(new Error('whoops')))); + const producer = () => rejectedSyncPromise(new Error('whoops')); const task = buffer.add(producer); try { await task; From 6dd8651189760feb3b276fe539659d46638fc0b9 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 25 Sep 2025 16:45:01 +0200 Subject: [PATCH 2/6] no need to wrap it --- packages/core/src/utils/promisebuffer.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/core/src/utils/promisebuffer.ts b/packages/core/src/utils/promisebuffer.ts index 8f849585c51b..72e7914e36c3 100644 --- a/packages/core/src/utils/promisebuffer.ts +++ b/packages/core/src/utils/promisebuffer.ts @@ -56,15 +56,15 @@ export function makePromiseBuffer(limit: number = 100): PromiseBuffer { return task; } - function drainNextSyncPromise(): PromiseLike { - const item = buffer.values().next().value; + function _drainNextSyncPromise(): PromiseLike { + const item = buffer.values().next().value as PromiseLike; if (!item) { return resolvedSyncPromise(true); } - return resolvedSyncPromise(item).then(() => { - return drainNextSyncPromise(); + return item.then(() => { + return _drainNextSyncPromise(); }); } @@ -82,7 +82,7 @@ export function makePromiseBuffer(limit: number = 100): PromiseBuffer { return resolvedSyncPromise(true); } - const drainPromise = drainNextSyncPromise(); + const drainPromise = _drainNextSyncPromise(); if (!timeout) { return drainPromise; From 3e40e17dc7ab4f4ecf2b93c91b48858aaa7fa538 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 25 Sep 2025 16:47:27 +0200 Subject: [PATCH 3/6] drain all at once --- packages/core/src/utils/promisebuffer.ts | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/packages/core/src/utils/promisebuffer.ts b/packages/core/src/utils/promisebuffer.ts index 72e7914e36c3..1b4067abe831 100644 --- a/packages/core/src/utils/promisebuffer.ts +++ b/packages/core/src/utils/promisebuffer.ts @@ -56,18 +56,6 @@ export function makePromiseBuffer(limit: number = 100): PromiseBuffer { return task; } - function _drainNextSyncPromise(): PromiseLike { - const item = buffer.values().next().value as PromiseLike; - - if (!item) { - return resolvedSyncPromise(true); - } - - return item.then(() => { - return _drainNextSyncPromise(); - }); - } - /** * Wait for all promises in the queue to resolve or for timeout to expire, whichever comes first. * @@ -82,7 +70,7 @@ export function makePromiseBuffer(limit: number = 100): PromiseBuffer { return resolvedSyncPromise(true); } - const drainPromise = _drainNextSyncPromise(); + const drainPromise = Promise.all(Array.from(buffer)).then(() => true); if (!timeout) { return drainPromise; From 093539b81e4461c28a3df6b034b59dea7717b6bc Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 25 Sep 2025 16:51:13 +0200 Subject: [PATCH 4/6] handle equivalent promises --- .../core/test/lib/utils/promisebuffer.test.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/packages/core/test/lib/utils/promisebuffer.test.ts b/packages/core/test/lib/utils/promisebuffer.test.ts index 5390a32001f9..0d075c63a98d 100644 --- a/packages/core/test/lib/utils/promisebuffer.test.ts +++ b/packages/core/test/lib/utils/promisebuffer.test.ts @@ -34,6 +34,27 @@ describe('PromiseBuffer', () => { expect(producer1).toHaveBeenCalled(); expect(producer2).not.toHaveBeenCalled(); }); + + test('handles multiple equivalent promises', async () => { + const buffer = makePromiseBuffer(10); + + const promise = new Promise(resolve => setTimeout(resolve, 1)); + + const producer = vi.fn(() => promise); + const producer2 = vi.fn(() => promise); + + expect(buffer.add(producer)).toEqual(promise); + expect(buffer.add(producer2)).toEqual(promise); + + expect(buffer.$.length).toEqual(1); + + expect(producer).toHaveBeenCalled(); + expect(producer2).toHaveBeenCalled(); + + await buffer.drain(); + + expect(buffer.$.length).toEqual(0); + }); }); describe('drain()', () => { From e6220bb59c50132440e4eed9615c121fb5a6d1a5 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Thu, 25 Sep 2025 16:54:01 +0200 Subject: [PATCH 5/6] more tests --- .../core/test/lib/utils/promisebuffer.test.ts | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/packages/core/test/lib/utils/promisebuffer.test.ts b/packages/core/test/lib/utils/promisebuffer.test.ts index 0d075c63a98d..dd78fe7689e1 100644 --- a/packages/core/test/lib/utils/promisebuffer.test.ts +++ b/packages/core/test/lib/utils/promisebuffer.test.ts @@ -4,6 +4,44 @@ import { rejectedSyncPromise, resolvedSyncPromise } from '../../../src/utils/syn describe('PromiseBuffer', () => { describe('add()', () => { + test('enforces limit of promises', async () => { + const buffer = makePromiseBuffer(5); + + const producer1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const producer3 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const producer4 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const producer5 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const producer6 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + + void buffer.add(producer1); + void buffer.add(producer2); + void buffer.add(producer3); + void buffer.add(producer4); + void buffer.add(producer5); + void expect(buffer.add(producer6)).rejects.toThrowError(); + + expect(producer1).toHaveBeenCalledTimes(1); + expect(producer2).toHaveBeenCalledTimes(1); + expect(producer3).toHaveBeenCalledTimes(1); + expect(producer4).toHaveBeenCalledTimes(1); + expect(producer5).toHaveBeenCalledTimes(1); + expect(producer6).not.toHaveBeenCalled(); + + expect(buffer.$.length).toEqual(5); + + await buffer.drain(); + + expect(buffer.$.length).toEqual(0); + + expect(producer1).toHaveBeenCalledTimes(1); + expect(producer2).toHaveBeenCalledTimes(1); + expect(producer3).toHaveBeenCalledTimes(1); + expect(producer4).toHaveBeenCalledTimes(1); + expect(producer5).toHaveBeenCalledTimes(1); + expect(producer6).not.toHaveBeenCalled(); + }); + test('sync promises', () => { const buffer = makePromiseBuffer(1); let task1; @@ -83,7 +121,7 @@ describe('PromiseBuffer', () => { expect(p5).toHaveBeenCalled(); }); - test('drains all promises with timeout xxx', async () => { + test('drains all promises with timeout', async () => { const buffer = makePromiseBuffer(); const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 2))); From be6b08e19a6b133ec7fee74e38d7f624aafebe80 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Fri, 26 Sep 2025 12:06:23 +0200 Subject: [PATCH 6/6] PR feedback --- packages/core/src/utils/promisebuffer.ts | 3 +- .../core/test/lib/utils/promisebuffer.test.ts | 33 ++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/packages/core/src/utils/promisebuffer.ts b/packages/core/src/utils/promisebuffer.ts index 1b4067abe831..f66077a76fd5 100644 --- a/packages/core/src/utils/promisebuffer.ts +++ b/packages/core/src/utils/promisebuffer.ts @@ -70,7 +70,8 @@ export function makePromiseBuffer(limit: number = 100): PromiseBuffer { return resolvedSyncPromise(true); } - const drainPromise = Promise.all(Array.from(buffer)).then(() => true); + // We want to resolve even if one of the promises rejects + const drainPromise = Promise.allSettled(Array.from(buffer)).then(() => true); if (!timeout) { return drainPromise; diff --git a/packages/core/test/lib/utils/promisebuffer.test.ts b/packages/core/test/lib/utils/promisebuffer.test.ts index dd78fe7689e1..b1316302e6f6 100644 --- a/packages/core/test/lib/utils/promisebuffer.test.ts +++ b/packages/core/test/lib/utils/promisebuffer.test.ts @@ -19,7 +19,7 @@ describe('PromiseBuffer', () => { void buffer.add(producer3); void buffer.add(producer4); void buffer.add(producer5); - void expect(buffer.add(producer6)).rejects.toThrowError(); + await expect(buffer.add(producer6)).rejects.toThrowError(); expect(producer1).toHaveBeenCalledTimes(1); expect(producer2).toHaveBeenCalledTimes(1); @@ -42,7 +42,7 @@ describe('PromiseBuffer', () => { expect(producer6).not.toHaveBeenCalled(); }); - test('sync promises', () => { + test('sync promises', async () => { const buffer = makePromiseBuffer(1); let task1; const producer1 = vi.fn(() => { @@ -51,14 +51,18 @@ describe('PromiseBuffer', () => { }); const producer2 = vi.fn(() => resolvedSyncPromise()); expect(buffer.add(producer1)).toEqual(task1); - void expect(buffer.add(producer2)).rejects.toThrowError(); + const add2 = buffer.add(producer2); + // This is immediately executed and removed again from the buffer expect(buffer.$.length).toEqual(0); + + await expect(add2).resolves.toBeUndefined(); + expect(producer1).toHaveBeenCalled(); expect(producer2).toHaveBeenCalled(); }); - test('async promises', () => { + test('async promises', async () => { const buffer = makePromiseBuffer(1); let task1; const producer1 = vi.fn(() => { @@ -67,8 +71,12 @@ describe('PromiseBuffer', () => { }); const producer2 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); expect(buffer.add(producer1)).toEqual(task1); - void expect(buffer.add(producer2)).rejects.toThrowError(); + const add2 = buffer.add(producer2); + expect(buffer.$.length).toEqual(1); + + await expect(add2).rejects.toThrowError(); + expect(producer1).toHaveBeenCalled(); expect(producer2).not.toHaveBeenCalled(); }); @@ -159,6 +167,21 @@ describe('PromiseBuffer', () => { expect(result).toEqual(true); expect(buffer.$.length).toEqual(0); }); + + test('resolves even if one of the promises rejects', async () => { + const buffer = makePromiseBuffer(); + const p1 = vi.fn(() => new Promise(resolve => setTimeout(resolve, 1))); + const p2 = vi.fn(() => new Promise((_, reject) => setTimeout(() => reject(new Error('whoops')), 1))); + void buffer.add(p1); + void buffer.add(p2); + + const result = await buffer.drain(); + expect(result).toEqual(true); + expect(buffer.$.length).toEqual(0); + + expect(p1).toHaveBeenCalled(); + expect(p2).toHaveBeenCalled(); + }); }); test('resolved promises should not show up in buffer length', async () => {