Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writablestream owning type #1272

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
submodules: true
- uses: actions/setup-node@v1
with:
node-version: 14
node-version: 19
- run: npm install
- run: npm test
171 changes: 124 additions & 47 deletions index.bs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl {
this, underlyingSource, underlyingSourceDict, highWaterMark
);
} else {
assert(!('type' in underlyingSourceDict));
assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'owning');
const sizeAlgorithm = ExtractSizeAlgorithm(strategy);
const highWaterMark = ExtractHighWaterMark(strategy, 1);
aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
aos.ReadableStreamDefaultControllerClose(this);
}

enqueue(chunk) {
enqueue(chunk, options) {
const transferList = options ? options.transfer : undefined;
if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) {
throw new TypeError('The stream is not in a state that permits enqueue');
}

return aos.ReadableStreamDefaultControllerEnqueue(this, chunk);
return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transferList);
}

error(e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface ReadableStreamDefaultController {
readonly attribute unrestricted double? desiredSize;

undefined close();
undefined enqueue(optional any chunk);
undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { });
undefined error(optional any e);
};
4 changes: 3 additions & 1 deletion reference-implementation/lib/UnderlyingSink.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ dictionary UnderlyingSink {
UnderlyingSinkWriteCallback write;
UnderlyingSinkCloseCallback close;
UnderlyingSinkAbortCallback abort;
any type;
WritableStreamType type;
};

callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller);
callback UnderlyingSinkWriteCallback = Promise<undefined> (any chunk, WritableStreamDefaultController controller);
callback UnderlyingSinkCloseCallback = Promise<undefined> ();
callback UnderlyingSinkAbortCallback = Promise<undefined> (optional any reason);

enum WritableStreamType { "owning" };
2 changes: 1 addition & 1 deletion reference-implementation/lib/UnderlyingSource.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };
enum ReadableStreamType { "bytes", "owning" };
2 changes: 1 addition & 1 deletion reference-implementation/lib/WritableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ exports.implementation = class WritableStreamImpl {
underlyingSink = null;
}
const underlyingSinkDict = UnderlyingSink.convert(underlyingSink);
if ('type' in underlyingSinkDict) {
if ('type' in underlyingSinkDict && underlyingSinkDict.type !== 'owning') {
throw new RangeError('Invalid type is specified');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ exports.implementation = class WritableStreamDefaultWriterImpl {
aos.WritableStreamDefaultWriterRelease(this);
}

write(chunk) {
write(chunk, options) {
const transferList = options ? options.transfer : undefined;
if (this._stream === undefined) {
return promiseRejectedWith(defaultWriterLockException('write to'));
}

return aos.WritableStreamDefaultWriterWrite(this, chunk);
return aos.WritableStreamDefaultWriterWrite(this, chunk, transferList);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
dictionary StructuredSerializeOptions {
sequence<object> transfer = [];
};

[Exposed=(Window,Worker,Worklet)]
interface WritableStreamDefaultWriter {
constructor(WritableStream stream);
Expand All @@ -9,5 +13,5 @@ interface WritableStreamDefaultWriter {
Promise<undefined> abort(optional any reason);
Promise<undefined> close();
undefined releaseLock();
Promise<undefined> write(optional any chunk);
Promise<undefined> write(optional any chunk, optional StructuredSerializeOptions options = { });
};
15 changes: 15 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,18 @@ exports.CloneAsUint8Array = O => {
const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength);
return new Uint8Array(buffer);
};

exports.StructuredTransferOrClone = (value, transferList) => {
return globalThis.structuredClone(value, { transfer: transferList });
};

exports.RunCloseSteps = value => {
if (typeof value.close === 'function') {
return;
}
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
};
14 changes: 11 additions & 3 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber } = require('./miscellaneous.js');
const { IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -15,7 +15,7 @@ exports.DequeueValue = container => {
return pair.value;
};

exports.EnqueueValueWithSize = (container, value, size) => {
exports.EnqueueValueWithSize = (container, value, size, transferList) => {
assert('_queue' in container && '_queueTotalSize' in container);

if (!IsNonNegativeNumber(size)) {
Expand All @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (container._isOwning && !container._isPipeToOptimizedTransfer) {
value = StructuredTransferOrClone(value, transferList);
}
container._queue.push({ value, size });
container._queueTotalSize += size;
};
Expand All @@ -40,6 +42,12 @@ exports.PeekQueueValue = container => {
exports.ResetQueue = container => {
assert('_queue' in container && '_queueTotalSize' in container);

if (container._isOwning) {
while (container._queue.length > 0) {
const value = exports.DequeueValue(container);
RunCloseSteps(value);
}
}
container._queue = [];
container._queueTotalSize = 0;
};
41 changes: 29 additions & 12 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } =
require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -89,7 +90,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi

const controller = ReadableStreamDefaultController.new(globalThis);
SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, false
);

return stream;
Expand Down Expand Up @@ -136,6 +137,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

const reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);
writer._stream._controller._isPipeToOptimizedTransfer = source._controller._isOwning && dest._controller._isOwning;

source._disturbed = true;

Expand Down Expand Up @@ -206,7 +208,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
{
chunkSteps: chunk => {
currentWrite = transformPromiseWith(
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {
if (reader._stream._controller._isOwning) {
RunCloseSteps(chunk);
}
}
);
resolveRead(false);
},
Expand Down Expand Up @@ -319,6 +325,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

function finalize(isError, error) {
writer._stream._controller._isPipeToOptimizedTransfer = undefined;
WritableStreamDefaultWriterRelease(writer);
ReadableStreamDefaultReaderRelease(reader);

Expand All @@ -340,7 +347,7 @@ function ReadableStreamTee(stream, cloneForBranch2) {
if (ReadableByteStreamController.isImpl(stream._controller)) {
return ReadableByteStreamTee(stream);
}
return ReadableStreamDefaultTee(stream, cloneForBranch2);
return ReadableStreamDefaultTee(stream, stream._controller._isOwning ? true : cloneForBranch2);
}

function ReadableStreamDefaultTee(stream, cloneForBranch2) {
Expand Down Expand Up @@ -392,10 +399,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
// }

if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined);
}
if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined);
}

reading = false;
Expand Down Expand Up @@ -1074,14 +1081,22 @@ function ReadableStreamDefaultControllerClose(controller) {
}
}

function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
function ReadableStreamDefaultControllerEnqueue(controller, chunk, transferList) {
if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) {
return;
}

const stream = controller._stream;

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller._isOwning) {
try {
chunk = StructuredTransferOrClone(chunk, transferList);
} catch (chunkCloneError) {
ReadableStreamDefaultControllerError(controller, chunkCloneError);
throw chunkCloneError;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize;
Expand All @@ -1093,7 +1108,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, transferList);
} catch (enqueueE) {
ReadableStreamDefaultControllerError(controller, enqueueE);
throw enqueueE;
Expand Down Expand Up @@ -1148,7 +1163,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) {
}

function SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) {
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isOwning) {
assert(stream._controller === undefined);

controller._stream = stream;
Expand All @@ -1169,6 +1184,8 @@ function SetUpReadableStreamDefaultController(
controller._pullAlgorithm = pullAlgorithm;
controller._cancelAlgorithm = cancelAlgorithm;

controller._isOwning = isOwning;

stream._controller = controller;

const startResult = startAlgorithm();
Expand All @@ -1195,7 +1212,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
let startAlgorithm = () => undefined;
let pullAlgorithm = () => promiseResolvedWith(undefined);
let cancelAlgorithm = () => promiseResolvedWith(undefined);

const isOwning = underlyingSourceDict.type === 'owning';
if ('start' in underlyingSourceDict) {
startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller);
}
Expand All @@ -1207,8 +1224,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource(
}

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm,
isOwning);
}

// Byte stream controllers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) {
// accept TransformStreamDefaultControllerEnqueue() calls.

try {
ReadableStreamDefaultControllerEnqueue(readableController, chunk);
ReadableStreamDefaultControllerEnqueue(readableController, chunk, undefined);
} catch (e) {
// This happens when readableStrategy.size() throws.
TransformStreamErrorWritableAndUnblockWrite(stream, e);
Expand Down
13 changes: 8 additions & 5 deletions reference-implementation/lib/abstract-ops/writable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ function WritableStreamDefaultWriterWrite(writer, chunk) {
// Default controllers

function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm,
abortAlgorithm, highWaterMark, sizeAlgorithm) {
abortAlgorithm, highWaterMark, sizeAlgorithm, isOwning) {
assert(WritableStream.isImpl(stream));
assert(stream._controller === undefined);

Expand All @@ -551,6 +551,8 @@ function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm
controller._closeAlgorithm = closeAlgorithm;
controller._abortAlgorithm = abortAlgorithm;

controller._isOwning = isOwning;

const backpressure = WritableStreamDefaultControllerGetBackpressure(controller);
WritableStreamUpdateBackpressure(stream, backpressure);

Expand Down Expand Up @@ -579,6 +581,7 @@ function SetUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyi
let writeAlgorithm = () => promiseResolvedWith(undefined);
let closeAlgorithm = () => promiseResolvedWith(undefined);
let abortAlgorithm = () => promiseResolvedWith(undefined);
const isOwning = underlyingSinkDict.type === 'owning';

if ('start' in underlyingSinkDict) {
startAlgorithm = () => underlyingSinkDict.start.call(underlyingSink, controller);
Expand All @@ -594,8 +597,8 @@ function SetUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyi
}

SetUpWritableStreamDefaultController(
stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm
);
stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm,
isOwning);
}

function WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
Expand Down Expand Up @@ -637,7 +640,7 @@ function WritableStreamDefaultControllerClearAlgorithms(controller) {
}

function WritableStreamDefaultControllerClose(controller) {
EnqueueValueWithSize(controller, closeSentinel, 0);
EnqueueValueWithSize(controller, closeSentinel, 0, undefined);
WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}

Expand Down Expand Up @@ -729,7 +732,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) {

function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
try {
EnqueueValueWithSize(controller, chunk, chunkSize);
EnqueueValueWithSize(controller, chunk, chunkSize, undefined);
} catch (enqueueE) {
WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
return;
Expand Down
5 changes: 4 additions & 1 deletion reference-implementation/run-web-platform-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ async function main() {
const excludeGlobs = [
// These tests use ArrayBuffers backed by WebAssembly.Memory objects, which *should* be non-transferable.
// However, our TransferArrayBuffer implementation cannot detect these, and will incorrectly "transfer" them anyway.
'readable-byte-streams/non-transferable-buffers.any.html'
'readable-byte-streams/non-transferable-buffers.any.html',
'readable-streams/owning-type-message-port.any.html', // MessagePort is not defined.
'readable-streams/owning-type-video-frame.any.html' // VideoFrame is not defined.
];
const anyTestPattern = /\.any\.html$/;

Expand All @@ -58,6 +60,7 @@ async function main() {
}
};
};
window.structuredClone = globalThis.structuredClone;
window.eval(bundledJS);
},
filter(testPath) {
Expand Down