Skip to content

Commit

Permalink
Revert "Merge pull request #106 from MattiasBuelens/fix-pipe-tests"
Browse files Browse the repository at this point in the history
This reverts commit 354aabb, reversing
changes made to 4f231a4.
  • Loading branch information
MattiasBuelens committed Nov 9, 2022
1 parent a778419 commit 07c5180
Showing 1 changed file with 48 additions and 97 deletions.
145 changes: 48 additions & 97 deletions src/lib/readable-stream/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { ReadableStreamState } from '../readable-stream';
import { IsReadableStream } from '../readable-stream';
import type { WritableStreamState } from '../writable-stream';
import { IsWritableStream, WritableStreamCloseQueuedOrInFlight } from '../writable-stream';
import { IsWritableStream } from '../writable-stream';
import type { ReadableStreamLike, WritableStreamLike } from '../helpers/stream-like';
import { IsReadableStreamLike, IsWritableStreamLike } from '../helpers/stream-like';
import assert from '../../stub/assert';
Expand All @@ -10,7 +10,6 @@ import {
PerformPromiseThen,
promiseRejectedWith,
promiseResolvedWith,
queueMicrotask,
setPromiseIsHandledToTrue,
transformPromiseWith,
uponFulfillment,
Expand Down Expand Up @@ -45,7 +44,6 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
let shuttingDown = false;
let released = false;
let sourceState: ReadableStreamState = 'readable';
let sourceStoredError: any;
let destState: WritableStreamState = 'writable';
let destStoredError: any;
let destCloseRequested = false;
Expand All @@ -58,8 +56,8 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
});

// This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown.
let currentRead: Promise<unknown> | undefined;
let currentWrite: Promise<unknown> | undefined;
let currentRead = promiseResolvedWith<unknown>(undefined);
let currentWrite = promiseResolvedWith<unknown>(undefined);

return newPromise((resolve, reject) => {
let abortAlgorithm: () => void;
Expand Down Expand Up @@ -137,17 +135,13 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
});
}

function handleSourceClose(): null {
uponPromise(reader.closed, () => {
// Closing must be propagated forward
assert(!released);
assert(!IsReadableStream(source) || source._state === 'closed');
sourceState = 'closed';
if (!preventClose) {
shutdownWithAction(() => {
if (IsWritableStream(dest)) {
destCloseRequested = WritableStreamCloseQueuedOrInFlight(dest);
destState = dest._state;
}
if (destCloseRequested || destState === 'closed') {
return promiseResolvedWith(undefined);
}
Expand All @@ -157,37 +151,32 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
assert(destState === 'writable' || destState === 'erroring');
destCloseRequested = true;
return writer.close();
}, false, undefined, true);
});
} else {
shutdown();
}
return null;
}

function handleSourceError(storedError: any): null {
}, storedError => {
if (released) {
return null;
}
// Errors must be propagated forward
assert(!IsReadableStream(source) || source._state === 'errored');
sourceState = 'errored';
sourceStoredError = storedError;
if (!preventAbort) {
shutdownWithAction(() => writer.abort(storedError), true, storedError);
} else {
shutdown(true, storedError);
}
return null;
}
});

function handleDestClose(): null {
uponPromise(writer.closed, () => {
assert(!released);
assert(!IsWritableStream(dest) || dest._state === 'closed');
destState = 'closed';
return null;
}

function handleDestError(storedError: any): null {
}, storedError => {
if (released) {
return null;
}
Expand All @@ -201,62 +190,27 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
shutdown(true, storedError);
}
return null;
}

// If we're using our own stream implementations, synchronously inspect their state.
if (IsReadableStream(source)) {
sourceState = source._state;
sourceStoredError = source._storedError;
}
if (IsWritableStream(dest)) {
destState = dest._state;
destStoredError = dest._storedError;
destCloseRequested = WritableStreamCloseQueuedOrInFlight(dest);
}
});

// If we synchronously inspected the stream's state, then we can shutdown immediately.
if (IsReadableStream(source) && IsWritableStream(dest)) {
// The reference implementation uses `queueMicrotask()` here, but that is not sufficient to detect
// closes or errors through `reader.closed` or `writer.closed`.
setTimeout(() => {
started = true;
resolveStart();
}

if (sourceState === 'errored') {
// Errors must be propagated forward
handleSourceError(sourceStoredError);
} else if (destState === 'errored') {
// Errors must be propagated backward
handleDestError(destStoredError);
} else if (sourceState === 'closed') {
// Closing must be propagated forward
handleSourceClose();
} else if (destCloseRequested || destState === 'closed') {
// Closing must be propagated backward
const destClosed = new TypeError('the destination writable stream closed before all data could be piped to it');
if (!preventCancel) {
shutdownWithAction(() => reader.cancel(destClosed), true, destClosed);
} else {
shutdown(true, destClosed);
}
}
if (destCloseRequested || destState === 'closed') {
const destClosed = new TypeError('the destination writable stream closed before all data could be piped to it');

// Detect asynchronous state transitions.
if (!shuttingDown) {
uponPromise(reader.closed, handleSourceClose, handleSourceError);
uponPromise(writer.closed, handleDestClose, handleDestError);
}
if (!preventCancel) {
shutdownWithAction(() => reader.cancel(destClosed), true, destClosed);
} else {
shutdown(true, destClosed);
}
}

// If we synchronously inspected the stream's state, then we can start the loop immediately.
// Otherwise, we give `reader.closed` or `writer.closed` a little bit of time to settle.
if (started) {
pipeLoop();
} else {
queueMicrotask(() => {
started = true;
resolveStart();

pipeLoop();
});
}
}, 0);

function waitForWritesToFinish(): Promise<void> {
let oldCurrentWrite: Promise<unknown> | undefined;
Expand All @@ -267,7 +221,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
// so we have to be sure to wait for that too.
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite!, check, check);
return transformPromiseWith(currentWrite, check, check);
}
return undefined;
}
Expand All @@ -283,20 +237,17 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
// so we have to be sure to wait for that too.
if (oldCurrentRead !== currentRead) {
oldCurrentRead = currentRead;
return transformPromiseWith(currentRead!, check, check);
return transformPromiseWith(currentRead, check, check);
}
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite!, check, check);
return transformPromiseWith(currentWrite, check, check);
}
return undefined;
}
}

function shutdownWithAction(action: (() => Promise<unknown>) | undefined,
isError?: boolean,
error?: any,
waitForReads?: boolean) {
function shutdownWithAction(action: () => Promise<unknown>, originalIsError?: boolean, originalError?: any) {
if (shuttingDown) {
return;
}
Expand All @@ -309,40 +260,40 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
}

function onStart(): null {
if (waitForReads && (currentRead !== undefined || currentWrite !== undefined)) {
uponFulfillment(waitForReadsAndWritesToFinish(), doTheRest);
} else if (currentWrite !== undefined) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
} else {
doTheRest();
}
uponFulfillment(waitForWritesToFinish(), doTheRest);
return null;
}

function doTheRest(): null {
if (action) {
uponPromise(
action(),
() => waitForReadsAndWritesThenFinalize(isError, error),
newError => waitForReadsAndWritesThenFinalize(true, newError)
);
} else {
waitForReadsAndWritesThenFinalize(isError, error);
}
uponPromise(
action(),
() => waitForReadsAndWritesThenFinalize(originalIsError, originalError),
newError => waitForReadsAndWritesThenFinalize(true, newError)
);
return null;
}
}

function shutdown(isError?: boolean, error?: any) {
shutdownWithAction(undefined, isError, error);
}
if (shuttingDown) {
return;
}
shuttingDown = true;

function waitForReadsAndWritesThenFinalize(isError?: boolean, error?: any): null {
if (currentRead !== undefined || currentWrite !== undefined) {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
if (!started) {
uponFulfillment(startPromise, onStart);
} else {
finalize(isError, error);
onStart();
}

function onStart(): null {
waitForReadsAndWritesThenFinalize(isError, error);
return null;
}
}

function waitForReadsAndWritesThenFinalize(isError?: boolean, error?: any): null {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
return null;
}

Expand Down

0 comments on commit 07c5180

Please sign in to comment.