Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

web streams: commit pull-into descriptors after filling from queue #56072

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const {
ArrayBufferViewGetByteLength,
ArrayBufferViewGetByteOffset,
AsyncIterator,
canCopyArrayBuffer,
cloneAsUint8Array,
copyArrayBuffer,
createPromiseCallback,
Expand Down Expand Up @@ -2552,6 +2553,15 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
}
}

function readableByteStreamControllerCommitPullIntoDescriptors(stream, descriptors) {
for (let i = 0; i < descriptors.length; ++i) {
readableByteStreamControllerCommitPullIntoDescriptor(
stream,
descriptors[i],
);
}
}

function readableByteStreamControllerInvalidateBYOBRequest(controller) {
if (controller[kState].byobRequest === null)
return;
Expand Down Expand Up @@ -2758,11 +2768,11 @@ function readableByteStreamControllerRespondInClosedState(controller, desc) {
stream,
} = controller[kState];
if (readableStreamHasBYOBReader(stream)) {
while (readableStreamGetNumReadIntoRequests(stream) > 0) {
readableByteStreamControllerCommitPullIntoDescriptor(
stream,
readableByteStreamControllerShiftPendingPullInto(controller));
const filledPullIntos = [];
for (let i = 0; i < readableStreamGetNumReadIntoRequests(stream); ++i) {
ArrayPrototypePush(filledPullIntos, readableByteStreamControllerShiftPendingPullInto(controller));
}
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
}
}

Expand Down Expand Up @@ -2843,8 +2853,9 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
transferredBuffer,
byteOffset,
byteLength);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller);
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
} else {
assert(!isReadableStreamLocked(stream));
readableByteStreamControllerEnqueueChunkToQueue(
Expand Down Expand Up @@ -2937,6 +2948,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
assert(!ArrayBufferPrototypeGetDetached(buffer));
assert(bytesFilled < minimumFill);
if (maxAlignedBytes >= minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
Expand All @@ -2952,12 +2964,12 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
totalBytesToCopyRemaining,
headOfQueue.byteLength);
const destStart = byteOffset + desc.bytesFilled;
const arrayBufferByteLength = ArrayBufferPrototypeGetByteLength(buffer);
if (arrayBufferByteLength - destStart < bytesToCopy) {
throw new ERR_INVALID_STATE.RangeError(
'view ArrayBuffer size is invalid');
}
assert(arrayBufferByteLength - destStart >= bytesToCopy);
assert(canCopyArrayBuffer(
buffer,
destStart,
headOfQueue.buffer,
headOfQueue.byteOffset,
bytesToCopy));
copyArrayBuffer(
buffer,
destStart,
Expand Down Expand Up @@ -2991,26 +3003,30 @@ function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
const {
closeRequested,
pendingPullIntos,
stream,
} = controller[kState];
assert(!closeRequested);
const filledPullIntos = [];
while (pendingPullIntos.length) {
if (!controller[kState].queueTotalSize)
return;
break;
const desc = pendingPullIntos[0];
if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
desc)) {
readableByteStreamControllerShiftPendingPullInto(controller);
readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
ArrayPrototypePush(filledPullIntos, desc);
}
}
return filledPullIntos;
}

function readableByteStreamControllerRespondInReadableState(
controller,
bytesWritten,
desc) {
const {
stream,
} = controller[kState];
const {
buffer,
bytesFilled,
Expand All @@ -3031,9 +3047,10 @@ function readableByteStreamControllerRespondInReadableState(
controller,
desc,
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller,
);
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
return;
}

Expand All @@ -3059,10 +3076,10 @@ function readableByteStreamControllerRespondInReadableState(
ArrayBufferPrototypeGetByteLength(remainder));
}
desc.bytesFilled -= remainderSize;
readableByteStreamControllerCommitPullIntoDescriptor(
controller[kState].stream,
desc);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);

readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
}

function readableByteStreamControllerRespondWithNewView(controller, view) {
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

const {
ArrayBufferPrototypeGetByteLength,
ArrayBufferPrototypeGetDetached,
ArrayBufferPrototypeSlice,
ArrayPrototypePush,
ArrayPrototypeShift,
Expand Down Expand Up @@ -107,6 +109,14 @@ function cloneAsUint8Array(view) {
);
}

function canCopyArrayBuffer(toBuffer, toIndex, fromBuffer, fromIndex, count) {
return toBuffer !== fromBuffer &&
!ArrayBufferPrototypeGetDetached(toBuffer) &&
!ArrayBufferPrototypeGetDetached(fromBuffer) &&
toIndex + count <= ArrayBufferPrototypeGetByteLength(toBuffer) &&
fromIndex + count <= ArrayBufferPrototypeGetByteLength(fromBuffer);
}

function isBrandCheck(brand) {
return (value) => {
return value != null &&
Expand Down Expand Up @@ -261,6 +271,7 @@ module.exports = {
ArrayBufferViewGetByteLength,
ArrayBufferViewGetByteOffset,
AsyncIterator,
canCopyArrayBuffer,
createPromiseCallback,
cloneAsUint8Array,
copyArrayBuffer,
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Last update:
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/94caab7038/performance-timeline
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
- resources: https://github.com/web-platform-tests/wpt/tree/1e140d63ec/resources
- streams: https://github.com/web-platform-tests/wpt/tree/2bd26e124c/streams
- streams: https://github.com/web-platform-tests/wpt/tree/bc9dcbbf1a/streams
- url: https://github.com/web-platform-tests/wpt/tree/67880a4eb8/url
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Expand Down
2 changes: 0 additions & 2 deletions test/fixtures/wpt/streams/idlharness-shadowrealm.window.js

This file was deleted.

2 changes: 1 addition & 1 deletion test/fixtures/wpt/streams/idlharness.any.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker
// META: global=window,worker,shadowrealm-in-window
// META: script=/resources/WebIDLParser.js
// META: script=/resources/idlharness.js
// META: timeout=long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,11 +870,11 @@ promise_test(() => {
start(c) {
controller = c;
},
async pull() {
pull() {
byobRequestDefined.push(controller.byobRequest !== null);
const initialByobRequest = controller.byobRequest;

const transferredView = await transferArrayBufferView(controller.byobRequest.view);
const transferredView = transferArrayBufferView(controller.byobRequest.view);
transferredView[0] = 0x01;
controller.byobRequest.respondWithNewView(transferredView);

Expand Down Expand Up @@ -2288,7 +2288,7 @@ promise_test(async t => {
await pullCalledPromise;

// Transfer the original BYOB request's buffer, and respond with a new view on that buffer
const transferredView = await transferArrayBufferView(controller.byobRequest.view);
const transferredView = transferArrayBufferView(controller.byobRequest.view);
const newView = transferredView.subarray(0, 1);
newView[0] = 42;

Expand Down Expand Up @@ -2328,7 +2328,7 @@ promise_test(async t => {
await pullCalledPromise;

// Transfer the original BYOB request's buffer, and respond with an empty view on that buffer
const transferredView = await transferArrayBufferView(controller.byobRequest.view);
const transferredView = transferArrayBufferView(controller.byobRequest.view);
const newView = transferredView.subarray(0, 0);

controller.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// META: global=window,worker,shadowrealm
// META: script=../resources/test-utils.js
'use strict';

// Tests which patch the global environment are kept separate to avoid
// interfering with other tests.

promise_test(async (t) => {
let controller;
const rs = new ReadableStream({
type: 'bytes',
start(c) {
controller = c;
}
});
const reader = rs.getReader({mode: 'byob'});

const length = 0x4000;
const buffer = new ArrayBuffer(length);
const bigArray = new BigUint64Array(buffer, length - 8, 1);

const read1 = reader.read(new Uint8Array(new ArrayBuffer(0x100)));
const read2 = reader.read(bigArray);

let flag = false;
Object.defineProperty(Object.prototype, 'then', {
get: t.step_func(() => {
if (!flag) {
flag = true;
assert_equals(controller.byobRequest, null, 'byobRequest should be null after filling both views');
}
}),
configurable: true
});
t.add_cleanup(() => {
delete Object.prototype.then;
});

controller.enqueue(new Uint8Array(0x110).fill(0x42));
assert_true(flag, 'patched then() should be called');

// The first read() is filled entirely with 0x100 bytes
const result1 = await read1;
assert_false(result1.done, 'result1.done');
assert_typed_array_equals(result1.value, new Uint8Array(0x100).fill(0x42), 'result1.value');

// The second read() is filled with the remaining 0x10 bytes
const result2 = await read2;
assert_false(result2.done, 'result2.done');
assert_equals(result2.value.constructor, BigUint64Array, 'result2.value constructor');
assert_equals(result2.value.byteOffset, length - 8, 'result2.value byteOffset');
assert_equals(result2.value.length, 1, 'result2.value length');
assert_array_equals([...result2.value], [0x42424242_42424242n], 'result2.value contents');
}, 'Patched then() sees byobRequest after filling all pending pull-into descriptors');
33 changes: 33 additions & 0 deletions test/fixtures/wpt/streams/readable-byte-streams/tee.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -934,3 +934,36 @@ promise_test(async () => {
assert_typed_array_equals(result4.value, new Uint8Array([0]).subarray(0, 0), 'second chunk from branch2 should be correct');

}, 'ReadableStream teeing with byte source: respond() and close() while both branches are pulling');

promise_test(async t => {
let pullCount = 0;
const arrayBuffer = new Uint8Array([0x01, 0x02, 0x03]).buffer;
const enqueuedChunk = new Uint8Array(arrayBuffer, 2);
assert_equals(enqueuedChunk.length, 1);
assert_equals(enqueuedChunk.byteOffset, 2);
const rs = new ReadableStream({
type: 'bytes',
pull(c) {
++pullCount;
if (pullCount === 1) {
c.enqueue(enqueuedChunk);
}
}
});

const [branch1, branch2] = rs.tee();
const reader1 = branch1.getReader();
const reader2 = branch2.getReader();

const [result1, result2] = await Promise.all([reader1.read(), reader2.read()]);
assert_equals(result1.done, false, 'reader1 done');
assert_equals(result2.done, false, 'reader2 done');

const view1 = result1.value;
const view2 = result2.value;
// The first stream has the transferred buffer, but the second stream has the
// cloned buffer.
const underlying = new Uint8Array([0x01, 0x02, 0x03]).buffer;
assert_typed_array_equals(view1, new Uint8Array(underlying, 2), 'reader1 value');
assert_typed_array_equals(view2, new Uint8Array([0x03]), 'reader2 value');
}, 'ReadableStream teeing with byte source: reading an array with a byte offset should clone correctly');
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<html class="test-wait">
<meta charset="utf-8">
<script type="module">
let a = window.open()
try {
let dir = await a.navigator.storage.getDirectory()
let hdl = await dir.getFileHandle("7399d8cf-9ff9-494d-89eb-d3045f229c27", {"create": true})
let map = new Map([[]])
let b = ReadableStream.from(map)
let c = await hdl.createWritable({ })
await b.pipeTo(c, { }).catch(() => {
// Error expected as we are not piping the right form of chunk to FileHandle
})
} finally {
document.documentElement.classList.remove("test-wait")
a.close()
}
</script>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// META: global=window,worker,shadowrealm
// META: global=window,worker
// META: script=../resources/test-utils.js
// META: script=../resources/rs-utils.js
'use strict';
Expand Down
Loading
Loading