diff --git a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts index 7bd258f3cc7e3..fe5771bef795b 100644 --- a/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts +++ b/src/plugins/bfetch/public/batching/create_streaming_batched_function.ts @@ -88,16 +88,28 @@ export const createStreamingBatchedFunction = ( }, onBatch: async (items) => { try { + // Filter out any items whose signal is already aborted + items = items.filter((item) => { + if (item.signal?.aborted) item.future.reject(new AbortError()); + return !item.signal?.aborted; + }); + const donePromises: Array> = items.map((item) => { - if (!item.signal) return item.future.promise.catch(() => {}); + return new Promise((resolve) => { + const { promise: abortPromise, cleanup } = item.signal + ? abortSignalToPromise(item.signal) + : { + promise: undefined, + cleanup: () => {}, + }; - // Reject promise if aborted - const { promise: abortPromise, cleanup } = abortSignalToPromise(item.signal); - abortPromise.catch(() => { - item.future.reject(new AbortError()); - cleanup(); + const onDone = () => { + resolve(); + cleanup(); + }; + if (abortPromise) abortPromise.catch(onDone); + item.future.promise.then(onDone, onDone); }); - return item.future.promise.then(cleanup, cleanup); }); // abort when all items were either resolved, rejected or aborted @@ -107,10 +119,7 @@ export const createStreamingBatchedFunction = ( isBatchDone = true; abortController.abort(); }); - const batch = items - // Filter out any items whose signal is already aborted - .filter((item) => !item.signal?.aborted) - .map((item) => item.payload); + const batch = items.map((item) => item.payload); const { stream } = fetchStreamingInjected({ url,