diff --git a/reference-implementation/lib/abstract-ops/miscellaneous.js b/reference-implementation/lib/abstract-ops/miscellaneous.js index ed55a5268..d4d9a054f 100644 --- a/reference-implementation/lib/abstract-ops/miscellaneous.js +++ b/reference-implementation/lib/abstract-ops/miscellaneous.js @@ -24,3 +24,14 @@ exports.CloneAsUint8Array = O => { exports.StructuredTransferOrClone = (value, transferList) => { return globalThis.structuredClone(value, { transfer: transferList }); }; + +exports.RunCloseSteps = value => { + if (typeof value.close === 'function') { + return; + } + try { + value.close(); + } catch (closeException) { + // Nothing to do. + } +}; diff --git a/reference-implementation/lib/abstract-ops/queue-with-sizes.js b/reference-implementation/lib/abstract-ops/queue-with-sizes.js index 4cc668e57..2a0d5b21d 100644 --- a/reference-implementation/lib/abstract-ops/queue-with-sizes.js +++ b/reference-implementation/lib/abstract-ops/queue-with-sizes.js @@ -1,6 +1,6 @@ 'use strict'; const assert = require('assert'); -const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js'); +const { IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js'); exports.DequeueValue = container => { assert('_queue' in container && '_queueTotalSize' in container); @@ -24,7 +24,7 @@ exports.EnqueueValueWithSize = (container, value, size, transferList) => { if (size === Infinity) { throw new RangeError('Size must be a finite, non-NaN, non-negative number.'); } - if (container._isOwning) { + if (container._isOwning && !container._isPipeToOptimizedTransfer) { value = StructuredTransferOrClone(value, transferList); } container._queue.push({ value, size }); @@ -45,13 +45,7 @@ exports.ResetQueue = container => { if (container._isOwning) { while (container._queue.length > 0) { const value = exports.DequeueValue(container); - if (typeof value.close === 'function') { - try { - value.close(); - } catch (closeException) { - // Nothing to do. - } - } + RunCloseSteps(value); } } container._queue = []; diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 98b0a35fb..b26223943 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -6,7 +6,8 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re require('../helpers/webidl.js'); const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } = require('./ecmascript.js'); -const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js'); +const { CloneAsUint8Array, IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = + require('./miscellaneous.js'); const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, @@ -136,6 +137,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC const reader = AcquireReadableStreamDefaultReader(source); const writer = AcquireWritableStreamDefaultWriter(dest); + writer._stream._controller._isPipeToOptimizedTransfer = source._controller._isOwning && dest._controller._isOwning; source._disturbed = true; @@ -206,7 +208,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC { chunkSteps: chunk => { currentWrite = transformPromiseWith( - WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {} + WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => { + if (reader._stream._controller._isOwning) { + RunCloseSteps(chunk); + } + } ); resolveRead(false); }, @@ -319,6 +325,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function finalize(isError, error) { + writer._stream._controller._isPipeToOptimizedTransfer = undefined; WritableStreamDefaultWriterRelease(writer); ReadableStreamDefaultReaderRelease(reader);