Skip to content

Commit

Permalink
Wait for both reads and writes to finish before finalizing pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasBuelens committed Oct 26, 2021
1 parent 392eb5e commit d77b1dd
Showing 1 changed file with 49 additions and 19 deletions.
68 changes: 49 additions & 19 deletions src/lib/readable-stream/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import {
promiseResolvedWith,
queueMicrotask,
setPromiseIsHandledToTrue,
transformPromiseWith,
uponFulfillment,
uponPromise
} from '../helpers/webidl';
import { noop } from '../../utils';
import type { AbortSignal } from '../abort-signal';
import { isAbortSignal } from '../abort-signal';
import { DOMException } from '../../stub/dom-exception';
Expand Down Expand Up @@ -51,8 +51,9 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
resolveStart = resolve;
});

// This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown.
let currentWrite = promiseResolvedWith<void>(undefined);
// This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown.
let currentRead = promiseResolvedWith<unknown>(undefined);
let currentWrite = promiseResolvedWith<unknown>(undefined);

return newPromise((resolve, reject) => {
let abortAlgorithm: () => void;
Expand Down Expand Up @@ -117,13 +118,16 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
}

return PerformPromiseThen(writer.ready, () => {
return PerformPromiseThen(reader.read(), result => {
const read = PerformPromiseThen(reader.read(), result => {
if (result.done) {
return true;
}
currentWrite = PerformPromiseThen(writer.write(result.value), undefined, noop);
currentWrite = writer.write(result.value);
setPromiseIsHandledToTrue(currentWrite);
return false;
});
currentRead = read;
return read;
});
}

Expand Down Expand Up @@ -200,13 +204,38 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
});

function waitForWritesToFinish(): Promise<void> {
// Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait
// for that too.
const oldCurrentWrite = currentWrite;
return PerformPromiseThen(
currentWrite,
() => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
);
let oldCurrentWrite: Promise<unknown> | undefined;
return promiseResolvedWith(check());

function check(): undefined | Promise<undefined> {
// Another write may have started while we were waiting on this currentWrite,
// so we have to be sure to wait for that too.
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite, check, check);
}
return undefined;
}
}

function waitForReadsAndWritesToFinish(): Promise<void> {
let oldCurrentRead: Promise<unknown> | undefined;
let oldCurrentWrite: Promise<unknown> | undefined;
return promiseResolvedWith(check());

function check(): undefined | Promise<undefined> {
// Another read or write may have started while we were waiting on this currentRead or currentWrite,
// so we have to be sure to wait for that too.
if (oldCurrentRead !== currentRead) {
oldCurrentRead = currentRead;
return transformPromiseWith(currentRead, check, check);
}
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite, check, check);
}
return undefined;
}
}

function shutdownWithAction(action: () => Promise<unknown>, originalIsError?: boolean, originalError?: any) {
Expand All @@ -233,8 +262,8 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
function doTheRest(): null {
uponPromise(
action(),
() => finalize(originalIsError, originalError),
newError => finalize(true, newError)
() => waitForReadsAndWritesThenFinalize(originalIsError, originalError),
newError => waitForReadsAndWritesThenFinalize(true, newError)
);
return null;
}
Expand All @@ -253,15 +282,16 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
}

function onStart(): null {
if (destState === 'writable' && !WritableStreamCloseQueuedOrInFlight(dest)) {
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
} else {
finalize(isError, error);
}
waitForReadsAndWritesThenFinalize(isError, error);
return null;
}
}

function waitForReadsAndWritesThenFinalize(isError?: boolean, error?: any): null {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
return null;
}

function finalize(isError?: boolean, error?: any): null {
released = true;
writer.releaseLock();
Expand Down

0 comments on commit d77b1dd

Please sign in to comment.