Skip to content

Commit

Permalink
stream: handle a pending pull request from a released reader
Browse files Browse the repository at this point in the history
In order to meet the specification, this includes mainly the followings:

- Adding the 'release steps' to ReadableStreamController
- Responding to a pull request from a released reader in
ReadableByteStreamController

Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com
PR-URL: #44702
Refs: https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
  • Loading branch information
daeyeon authored and RafaelGSS committed Sep 26, 2022
1 parent 33a2f17 commit 3d42aaa
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 8 deletions.
4 changes: 4 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ Signals an error that causes the {ReadableStream} to error and close.
<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/44702
description: Support handling a BYOB pull request from a released reader.
-->
Every {ReadableStream} has a controller that is responsible for
Expand Down
82 changes: 82 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const kClose = Symbol('kClose');
const kChunk = Symbol('kChunk');
const kError = Symbol('kError');
const kPull = Symbol('kPull');
const kRelease = Symbol('kRelease');

/**
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
Expand Down Expand Up @@ -1019,6 +1020,8 @@ class ReadableStreamDefaultController {
readableStreamDefaultControllerPullSteps(this, readRequest);
}

[kRelease]() {}

[kInspect](depth, options) {
return customInspect(depth, options, this[kType], { });
}
Expand Down Expand Up @@ -1143,6 +1146,17 @@ class ReadableByteStreamController {
readableByteStreamControllerPullSteps(this, readRequest);
}

[kRelease]() {
const {
pendingPullIntos,
} = this[kState];
if (pendingPullIntos.length > 0) {
const firstPendingPullInto = pendingPullIntos[0];
firstPendingPullInto.type = 'none';
this[kState].pendingPullIntos = [firstPendingPullInto];
}
}

[kInspect](depth, options) {
return customInspect(depth, options, this[kType], { });
}
Expand Down Expand Up @@ -2060,6 +2074,9 @@ function readableStreamReaderGenericRelease(reader) {
};
}
setPromiseHandled(reader[kState].close.promise);

stream[kState].controller[kRelease]();

stream[kState].reader = undefined;
reader[kState].stream = undefined;
}
Expand Down Expand Up @@ -2365,6 +2382,8 @@ function readableByteStreamControllerClose(controller) {

function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
assert(stream[kState].state !== 'errored');
assert(desc.type !== 'none');

let done = false;
if (stream[kState].state === 'closed') {
desc.bytesFilled = 0;
Expand Down Expand Up @@ -2574,6 +2593,9 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {

function readableByteStreamControllerRespondInClosedState(controller, desc) {
assert(!desc.bytesFilled);
if (desc.type === 'none') {
readableByteStreamControllerShiftPendingPullInto(controller);
}
const {
stream,
} = controller[kState];
Expand Down Expand Up @@ -2663,6 +2685,31 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
readableByteStreamControllerCallPullIfNeeded(controller);
}

function readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
buffer,
byteOffset,
byteLength
) {
let cloneResult;
try {
cloneResult = ArrayBufferPrototypeSlice(
buffer,
byteOffset,
byteOffset + byteLength
);
} catch (error) {
readableByteStreamControllerError(controller, error);
throw error;
}
readableByteStreamControllerEnqueueChunkToQueue(
controller,
cloneResult,
0,
byteLength
);
}

function readableByteStreamControllerEnqueueChunkToQueue(
controller,
buffer,
Expand All @@ -2678,6 +2725,29 @@ function readableByteStreamControllerEnqueueChunkToQueue(
controller[kState].queueTotalSize += byteLength;
}

function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
desc
) {
const {
buffer,
byteOffset,
bytesFilled,
type,
} = desc;
assert(type === 'none');

if (bytesFilled > 0) {
readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
buffer,
byteOffset,
bytesFilled
);
}
readableByteStreamControllerShiftPendingPullInto(controller);
}

function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
desc) {
Expand Down Expand Up @@ -2773,6 +2843,7 @@ function readableByteStreamControllerRespondInReadableState(
buffer,
bytesFilled,
byteLength,
type,
} = desc;

if (bytesFilled + bytesWritten > byteLength)
Expand All @@ -2783,6 +2854,17 @@ function readableByteStreamControllerRespondInReadableState(
bytesWritten,
desc);

if (type === 'none') {
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
desc
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller
);
return;
}

if (desc.bytesFilled < desc.elementSize)
return;

Expand Down
8 changes: 0 additions & 8 deletions test/wpt/status/streams.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,9 @@
"fail": {
"expected": [
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
]
}
Expand Down

0 comments on commit 3d42aaa

Please sign in to comment.