Skip to content

Commit

Permalink
filter items in the beginning
Browse files Browse the repository at this point in the history
  • Loading branch information
Liza K committed Nov 22, 2020
1 parent 4bd2a96 commit e9103b7
Showing 1 changed file with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,28 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
},
onBatch: async (items) => {
try {
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});

const donePromises: Array<Promise<any>> = items.map((item) => {
if (!item.signal) return item.future.promise.catch(() => {});
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};

// Reject promise if aborted
const { promise: abortPromise, cleanup } = abortSignalToPromise(item.signal);
abortPromise.catch(() => {
item.future.reject(new AbortError());
cleanup();
const onDone = () => {
resolve();
cleanup();
};
if (abortPromise) abortPromise.catch(onDone);
item.future.promise.then(onDone, onDone);
});
return item.future.promise.then(cleanup, cleanup);
});

// abort when all items were either resolved, rejected or aborted
Expand All @@ -107,10 +119,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
isBatchDone = true;
abortController.abort();
});
const batch = items
// Filter out any items whose signal is already aborted
.filter((item) => !item.signal?.aborted)
.map((item) => item.payload);
const batch = items.map((item) => item.payload);

const { stream } = fetchStreamingInjected({
url,
Expand Down

0 comments on commit e9103b7

Please sign in to comment.