Skip to content

Commit

Permalink
stream: improve readable webstream pipeTo
Browse files Browse the repository at this point in the history
PR-URL: #49690
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
  • Loading branch information
rluvaton authored and ruyadorno committed Sep 28, 2023
1 parent a304d1e commit b29d927
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const {
ObjectCreate,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
Expand Down Expand Up @@ -1354,7 +1353,9 @@ function readableStreamPipeTo(

const promise = createDeferredPromise();

let currentWrite = PromiseResolve();
const state = {
currentWrite: PromiseResolve(),
};

// The error here can be undefined. The rejected arg
// tells us that the promise must be rejected even
Expand All @@ -1371,9 +1372,9 @@ function readableStreamPipeTo(
}

async function waitForCurrentWrite() {
const write = currentWrite;
const write = state.currentWrite;
await write;
if (write !== currentWrite)
if (write !== state.currentWrite)
await waitForCurrentWrite();
}

Expand Down Expand Up @@ -1464,20 +1465,14 @@ function readableStreamPipeTo(
async function step() {
if (shuttingDown)
return true;

await writer[kState].ready.promise;
return new Promise((resolve, reject) => {
readableStreamDefaultReaderRead(
reader,
{
[kChunk](chunk) {
currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
setPromiseHandled(currentWrite);
resolve(false);
},
[kClose]: () => resolve(true),
[kError]: reject,
});
});

const promise = createDeferredPromise();
// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));

return promise.promise;
}

async function run() {
Expand Down Expand Up @@ -1539,6 +1534,28 @@ function readableStreamPipeTo(
return promise.promise;
}

class PipeToReadableStreamReadRequest {
constructor(writer, state, promise) {
this.writer = writer;
this.state = state;
this.promise = promise;
}

[kChunk](chunk) {
this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk);
setPromiseHandled(this.state.currentWrite);
this.promise.resolve(false);
}

[kClose]() {
this.promise.resolve(true);
}

[kError](error) {
this.promise.reject(error);
}
}

function readableStreamTee(stream, cloneForBranch2) {
if (isReadableByteStreamController(stream[kState].controller)) {
return readableByteStreamTee(stream);
Expand Down

0 comments on commit b29d927

Please sign in to comment.