Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pipeTo() tests #106

Merged
merged 8 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 97 additions & 48 deletions src/lib/readable-stream/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { ReadableStreamState } from '../readable-stream';
import { IsReadableStream } from '../readable-stream';
import type { WritableStreamState } from '../writable-stream';
import { IsWritableStream } from '../writable-stream';
import { IsWritableStream, WritableStreamCloseQueuedOrInFlight } from '../writable-stream';
import type { ReadableStreamLike, WritableStreamLike } from '../helpers/stream-like';
import { IsReadableStreamLike, IsWritableStreamLike } from '../helpers/stream-like';
import assert from '../../stub/assert';
Expand All @@ -10,6 +10,7 @@ import {
PerformPromiseThen,
promiseRejectedWith,
promiseResolvedWith,
queueMicrotask,
setPromiseIsHandledToTrue,
transformPromiseWith,
uponFulfillment,
Expand Down Expand Up @@ -44,6 +45,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
let shuttingDown = false;
let released = false;
let sourceState: ReadableStreamState = 'readable';
let sourceStoredError: any;
let destState: WritableStreamState = 'writable';
let destStoredError: any;
let destCloseRequested = false;
Expand All @@ -56,8 +58,8 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
});

// This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown.
let currentRead = promiseResolvedWith<unknown>(undefined);
let currentWrite = promiseResolvedWith<unknown>(undefined);
let currentRead: Promise<unknown> | undefined;
let currentWrite: Promise<unknown> | undefined;

return newPromise((resolve, reject) => {
let abortAlgorithm: () => void;
Expand Down Expand Up @@ -135,13 +137,17 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
});
}

uponPromise(reader.closed, () => {
function handleSourceClose(): null {
// Closing must be propagated forward
assert(!released);
assert(!IsReadableStream(source) || source._state === 'closed');
sourceState = 'closed';
if (!preventClose) {
shutdownWithAction(() => {
if (IsWritableStream(dest)) {
destCloseRequested = WritableStreamCloseQueuedOrInFlight(dest);
destState = dest._state;
}
if (destCloseRequested || destState === 'closed') {
return promiseResolvedWith(undefined);
}
Expand All @@ -151,32 +157,37 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
assert(destState === 'writable' || destState === 'erroring');
destCloseRequested = true;
return writer.close();
});
}, false, undefined, true);
} else {
shutdown();
}
return null;
}, storedError => {
}

function handleSourceError(storedError: any): null {
if (released) {
return null;
}
// Errors must be propagated forward
assert(!IsReadableStream(source) || source._state === 'errored');
sourceState = 'errored';
sourceStoredError = storedError;
if (!preventAbort) {
shutdownWithAction(() => writer.abort(storedError), true, storedError);
} else {
shutdown(true, storedError);
}
return null;
});
}

uponPromise(writer.closed, () => {
function handleDestClose(): null {
assert(!released);
assert(!IsWritableStream(dest) || dest._state === 'closed');
destState = 'closed';
return null;
}, storedError => {
}

function handleDestError(storedError: any): null {
if (released) {
return null;
}
Expand All @@ -190,27 +201,62 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
shutdown(true, storedError);
}
return null;
});
}

// If we're using our own stream implementations, synchronously inspect their state.
if (IsReadableStream(source)) {
sourceState = source._state;
sourceStoredError = source._storedError;
}
if (IsWritableStream(dest)) {
destState = dest._state;
destStoredError = dest._storedError;
destCloseRequested = WritableStreamCloseQueuedOrInFlight(dest);
}

// The reference implementation uses `queueMicrotask()` here, but that is not sufficient to detect
// closes or errors through `reader.closed` or `writer.closed`.
setTimeout(() => {
// If we synchronously inspected the stream's state, then we can shutdown immediately.
if (IsReadableStream(source) && IsWritableStream(dest)) {
started = true;
resolveStart();
}

if (sourceState === 'errored') {
// Errors must be propagated forward
handleSourceError(sourceStoredError);
} else if (destState === 'errored') {
// Errors must be propagated backward
handleDestError(destStoredError);
} else if (sourceState === 'closed') {
// Closing must be propagated forward
handleSourceClose();
} else if (destCloseRequested || destState === 'closed') {
// Closing must be propagated backward
if (destCloseRequested || destState === 'closed') {
const destClosed = new TypeError('the destination writable stream closed before all data could be piped to it');

if (!preventCancel) {
shutdownWithAction(() => reader.cancel(destClosed), true, destClosed);
} else {
shutdown(true, destClosed);
}
const destClosed = new TypeError('the destination writable stream closed before all data could be piped to it');
if (!preventCancel) {
shutdownWithAction(() => reader.cancel(destClosed), true, destClosed);
} else {
shutdown(true, destClosed);
}
}

// Detect asynchronous state transitions.
if (!shuttingDown) {
uponPromise(reader.closed, handleSourceClose, handleSourceError);
uponPromise(writer.closed, handleDestClose, handleDestError);
}

// If we synchronously inspected the stream's state, then we can start the loop immediately.
// Otherwise, we give `reader.closed` or `writer.closed` a little bit of time to settle.
if (started) {
pipeLoop();
}, 0);
} else {
queueMicrotask(() => {
started = true;
resolveStart();

pipeLoop();
});
}

function waitForWritesToFinish(): Promise<void> {
let oldCurrentWrite: Promise<unknown> | undefined;
Expand All @@ -221,7 +267,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
// so we have to be sure to wait for that too.
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite, check, check);
return transformPromiseWith(currentWrite!, check, check);
}
return undefined;
}
Expand All @@ -237,17 +283,20 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
// so we have to be sure to wait for that too.
if (oldCurrentRead !== currentRead) {
oldCurrentRead = currentRead;
return transformPromiseWith(currentRead, check, check);
return transformPromiseWith(currentRead!, check, check);
}
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite, check, check);
return transformPromiseWith(currentWrite!, check, check);
}
return undefined;
}
}

function shutdownWithAction(action: () => Promise<unknown>, originalIsError?: boolean, originalError?: any) {
function shutdownWithAction(action: (() => Promise<unknown>) | undefined,
isError?: boolean,
error?: any,
waitForReads?: boolean) {
if (shuttingDown) {
return;
}
Expand All @@ -260,40 +309,40 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
}

function onStart(): null {
uponFulfillment(waitForWritesToFinish(), doTheRest);
if (waitForReads && (currentRead !== undefined || currentWrite !== undefined)) {
uponFulfillment(waitForReadsAndWritesToFinish(), doTheRest);
} else if (currentWrite !== undefined) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
} else {
doTheRest();
}
return null;
}

function doTheRest(): null {
uponPromise(
action(),
() => waitForReadsAndWritesThenFinalize(originalIsError, originalError),
newError => waitForReadsAndWritesThenFinalize(true, newError)
);
if (action) {
uponPromise(
action(),
() => waitForReadsAndWritesThenFinalize(isError, error),
newError => waitForReadsAndWritesThenFinalize(true, newError)
);
} else {
waitForReadsAndWritesThenFinalize(isError, error);
}
return null;
}
}

function shutdown(isError?: boolean, error?: any) {
if (shuttingDown) {
return;
}
shuttingDown = true;

if (!started) {
uponFulfillment(startPromise, onStart);
} else {
onStart();
}

function onStart(): null {
waitForReadsAndWritesThenFinalize(isError, error);
return null;
}
shutdownWithAction(undefined, isError, error);
}

function waitForReadsAndWritesThenFinalize(isError?: boolean, error?: any): null {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
if (currentRead !== undefined || currentWrite !== undefined) {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
} else {
finalize(isError, error);
}
return null;
}

Expand Down
79 changes: 0 additions & 79 deletions test/unit/piping/multiple-propagation.spec.js

This file was deleted.

6 changes: 0 additions & 6 deletions test/wpt/shared/exclusions.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ const ignoredFailuresBase = {
// Our async iterator won't extend from the built-in %AsyncIteratorPrototype%
'readable-streams/async-iterator.any.html': [
'Async iterator instances should have the correct list of properties'
],
'piping/multiple-propagation.any.html': [
// FIXME Detect erroring writable stream before errored readable stream somehow?
'Piping from an errored readable stream to an erroring writable stream',
// FIXME Detect closing/closed writable stream before errored readable stream somehow?
'Piping from an errored readable stream to a closing writable stream'
]
};

Expand Down