Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @@asyncIterator to ReadableStream #980

Merged
merged 18 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ spec:promises-guide; type:dfn;
<pre class="anchors">
urlPrefix: https://tc39.github.io/ecma262/; spec: ECMASCRIPT
text: %Uint8Array%; url: #sec-typedarray-objects; type: constructor
text: %AsyncIteratorPrototype%; url: #sec-asynciteratorprototype; type: interface
text: AsyncIterator; url: #sec-asynciterator-interface; type: interface
text: ArrayBuffer; url: #sec-arraybuffer-objects; type: interface
text: DataView; url: #sec-dataview-objects; type: interface
text: Number; url: #sec-ecmascript-language-types-number-type; type: interface
Expand Down Expand Up @@ -399,11 +401,14 @@ like
get <a href="#rs-locked">locked</a>()

<a href="#rs-cancel">cancel</a>(reason)
<a href="#rs-get-reader">getReader</a>()
<a href="#rs-get-iterator">getIterator</a>({ preventCancel } = {})
<a href="#rs-get-reader">getReader</a>({ mode } = {})
ricea marked this conversation as resolved.
Show resolved Hide resolved
<a href="#rs-pipe-through">pipeThrough</a>({ writable, readable },
{ preventClose, preventAbort, preventCancel, signal } = {})
<a href="#rs-pipe-to">pipeTo</a>(dest, { preventClose, preventAbort, preventCancel, signal } = {})
<a href="#rs-tee">tee</a>()

<a href="#rs-asynciterator">[@@asyncIterator]</a>({ preventCancel } = {})
}
</code></pre>

Expand Down Expand Up @@ -602,6 +607,23 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
1. Return ! ReadableStreamCancel(*this*, _reason_).
</emu-alg>

<h5 id="rs-get-iterator" method for="ReadableStream">getIterator({ <var>preventCancel</var> } = {})</h5>

<div class="note">
The <code>getIterator</code> method returns an async iterator which can be used to consume the stream. The
{{ReadableStreamAsyncIteratorPrototype/return()}} method of this iterator object will, by default,
<a lt="cancel a readable stream">cancel</a> the stream; it will also release the reader.
</div>

<emu-alg>
1. If ! IsReadableStream(*this*) is *false*, throw a *TypeError* exception.
1. Let _reader_ be ? AcquireReadableStreamDefaultReader(*this*).
1. Let _iterator_ be ! ObjectCreate(`<a idl>ReadableStreamAsyncIteratorPrototype</a>`).
1. Set _iterator_.[[asyncIteratorReader]] to _reader_.
1. Set _iterator_.[[preventCancel]] to ! ToBoolean(_preventCancel_).
1. Return _iterator_.
</emu-alg>

<h5 id="rs-get-reader" method for="ReadableStream">getReader({ <var ignore>mode</var> } = {})</h5>

<div class="note">
Expand Down Expand Up @@ -792,6 +814,80 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
</code></pre>
</div>

<!-- Bikeshed doesn't let us mark this up correctly: https://github.com/tabatkins/bikeshed/issues/1344 -->
<h5 id="rs-asynciterator" iterator for="ReadableStream">[@@asyncIterator]({ <var>preventCancel</var> } = {})</h5>
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved

<p class="note">
The <code>@@asyncIterator</code> method is an alias of {{ReadableStream/getIterator()}}.
</p>

The initial value of the <code>@@asyncIterator</code> method is the same function object as the initial value of the
{{ReadableStream/getIterator()}} method.

<h3 id="rs-asynciterator-prototype" interface
lt="ReadableStreamAsyncIteratorPrototype">ReadableStreamAsyncIteratorPrototype</h3>

{{ReadableStreamAsyncIteratorPrototype}} is an ordinary object that is used by {{ReadableStream/getIterator()}} to
construct the objects it returns. Instances of {{ReadableStreamAsyncIteratorPrototype}} implement the {{AsyncIterator}}
abstract interface from the JavaScript specification. [[!ECMASCRIPT]]

The {{ReadableStreamAsyncIteratorPrototype}} object must have its \[[Prototype]] internal slot set to
{{%AsyncIteratorPrototype%}}.

<h4 id="default-reader-asynciterator-prototype-internal-slots">Internal slots</h4>
Objects created by {{ReadableStream/getIterator()}}, using {{ReadableStreamAsyncIteratorPrototype}} as their
prototype, are created with the internal slots described in the following table:
<table>
<thead>
<tr>
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[asyncIteratorReader]]
<td class="non-normative">A {{ReadableStreamDefaultReader}} instance
</tr>
<tr>
<td>\[[preventCancel]]
<td class="non-normative">A boolean value indicating if the stream will be <a lt="cancel a readable
stream">canceled</a> when the async iterator's {{ReadableStreamAsyncIteratorPrototype/return()}} method is called
</tr>
</table>

<h4 id="rs-asynciterator-prototype-next" method for="ReadableStreamAsyncIteratorPrototype">next()</h4>

<emu-alg>
1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
1. Let _reader_ be *this*.[[asyncIteratorReader]].
1. If _reader_.[[ownerReadableStream]] is *undefined*, return <a>a promise rejected with</a> a *TypeError* exception.
1. Return the result of <a>transforming</a> ! ReadableStreamDefaultReaderRead(_reader_) with a fulfillment handler
which takes the argument _result_ and performs the following steps:
1. Assert: Type(_result_) is Object.
1. Let _value_ be ? Get(_result_, `"value"`).
1. Let _done_ be ? Get(_result_, `"done"`).
1. Assert: Type(_done_) is Boolean.
1. If _done_ is *true*, perform ! ReadableStreamReaderGenericRelease(_reader_).
1. Return ! ReadableStreamCreateReadResult(_value_, _done_, *true*).
</emu-alg>

<h4 id="rs-asynciterator-prototype-return" method
for="ReadableStreamAsyncIteratorPrototype">return( <var>value</var> )</h4>

<emu-alg>
1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
1. Let _reader_ be *this*.[[asyncIteratorReader]].
1. If _reader_.[[ownerReadableStream]] is *undefined*, return <a>a promise rejected with</a> a *TypeError* exception.
1. If _reader_.[[readRequests]] is not empty, return <a>a promise rejected with</a> a *TypeError* exception.
1. If *this*.[[preventCancel]] is *false*, then:
1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_).
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
1. Return the result of <a>transforming</a> _result_ by a fulfillment handler that returns !
ReadableStreamCreateReadResult(_value_, *true*, *true*).
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
1. Return <a>a promise resolved with</a> ! ReadableStreamCreateReadResult(_value_, *true*, *true*).
</emu-alg>

<h3 id="rs-abstract-ops">General readable stream abstract operations</h3>

The following abstract operations, unlike most in this specification, are meant to be generally useful by other
Expand Down Expand Up @@ -910,6 +1006,15 @@ readable stream is <a>locked to a reader</a>.
1. Return *true*.
</emu-alg>

<h4 id="is-readable-stream-asynciterator" aoid="IsReadableStreamAsyncIterator" nothrow
export>IsReadableStreamAsyncIterator ( <var>x</var> )</h4>

<emu-alg>
1. If Type(_x_) is not Object, return *false*.
1. If _x_ does not have a [[asyncIteratorReader]] internal slot, return *false*.
1. Return *true*.
</emu-alg>

<h4 id="readable-stream-tee" aoid="ReadableStreamTee" throws export>ReadableStreamTee ( <var>stream</var>,
<var>cloneForBranch2</var> )</h4>

Expand Down
11 changes: 10 additions & 1 deletion reference-implementation/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,16 @@
"id-blacklist": "off",
"id-length": "off",
"id-match": "off",
"indent": ["error", 2, { "SwitchCase": 1 }],
"indent": ["error", 2, {
"SwitchCase": 1,
"MemberExpression": 2,
"FunctionDeclaration": { "parameters": "first" },
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
"FunctionExpression": { "parameters": "first" },
"CallExpression": { "arguments": "first" },
"ArrayExpression": "first",
"ObjectExpression": "first",
"ImportDeclaration": "first"
}],
"jsx-quotes": "off",
"key-spacing": ["error", { "beforeColon": false, "afterColon": true, "mode": "strict" }],
"keyword-spacing": ["error", { "before": true, "after": true }],
Expand Down
122 changes: 101 additions & 21 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ReadableStream {
}
if (IsWritableStream(dest) === false) {
return Promise.reject(
new TypeError('ReadableStream.prototype.pipeTo\'s first argument must be a WritableStream'));
new TypeError('ReadableStream.prototype.pipeTo\'s first argument must be a WritableStream'));
}

preventClose = Boolean(preventClose);
Expand Down Expand Up @@ -158,8 +158,72 @@ class ReadableStream {
const branches = ReadableStreamTee(this, false);
return createArrayFromList(branches);
}

getIterator({ preventCancel = false } = {}) {
if (IsReadableStream(this) === false) {
throw streamBrandCheckException('getIterator');
}
const reader = AcquireReadableStreamDefaultReader(this);
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
iterator._asyncIteratorReader = reader;
iterator._preventCancel = Boolean(preventCancel);
return iterator;
}
}

const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype);
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
next() {
if (IsReadableStreamAsyncIterator(this) === false) {
return Promise.reject(streamAsyncIteratorBrandCheckException('next'));
}
const reader = this._asyncIteratorReader;
if (reader._ownerReadableStream === undefined) {
return Promise.reject(readerLockException('iterate'));
}
return ReadableStreamDefaultReaderRead(reader).then(result => {
assert(typeIsObject(result));
const value = result.value;
const done = result.done;
assert(typeof done === 'boolean');
if (done) {
ReadableStreamReaderGenericRelease(reader);
}
return ReadableStreamCreateReadResult(value, done, true);
});
},

return(value) {
if (IsReadableStreamAsyncIterator(this) === false) {
return Promise.reject(streamAsyncIteratorBrandCheckException('next'));
}
const reader = this._asyncIteratorReader;
if (reader._ownerReadableStream === undefined) {
return Promise.reject(readerLockException('finish iterating'));
}
if (reader._readRequests.length > 0) {
return Promise.reject(new TypeError(
'Tried to release a reader lock when that reader has pending read() calls un-settled'));
}
if (this._preventCancel === false) {
const result = ReadableStreamReaderGenericCancel(reader, value);
ReadableStreamReaderGenericRelease(reader);
return result.then(() => ReadableStreamCreateReadResult(value, true, true));
}
ReadableStreamReaderGenericRelease(reader);
return Promise.resolve(ReadableStreamCreateReadResult(value, true, true));
}
}, AsyncIteratorPrototype);
Object.defineProperty(ReadableStreamAsyncIteratorPrototype, 'next', { enumerable: false });
Object.defineProperty(ReadableStreamAsyncIteratorPrototype, 'return', { enumerable: false });

Object.defineProperty(ReadableStream.prototype, Symbol.asyncIterator, {
value: ReadableStream.prototype.getIterator,
enumerable: false,
writable: true,
configurable: true
});

module.exports = {
CreateReadableByteStream,
CreateReadableStream,
Expand Down Expand Up @@ -194,7 +258,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi
const controller = Object.create(ReadableStreamDefaultController.prototype);

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);

return stream;
Expand Down Expand Up @@ -255,6 +319,18 @@ function IsReadableStreamLocked(stream) {
return true;
}

function IsReadableStreamAsyncIterator(x) {
if (!typeIsObject(x)) {
return false;
}

if (!Object.prototype.hasOwnProperty.call(x, '_asyncIteratorReader')) {
return false;
}

return true;
}

function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventCancel, signal) {
assert(IsReadableStream(source) === true);
assert(IsWritableStream(dest) === true);
Expand Down Expand Up @@ -420,8 +496,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

function doTheRest() {
action().then(
() => finalize(originalIsError, originalError),
newError => finalize(true, newError)
() => finalize(originalIsError, originalError),
newError => finalize(true, newError)
)
.catch(rethrowAssertionErrorRejection);
}
Expand Down Expand Up @@ -931,12 +1007,12 @@ function ReadableStreamReaderGenericRelease(reader) {

if (reader._ownerReadableStream._state === 'readable') {
defaultReaderClosedPromiseReject(
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
} else {
defaultReaderClosedPromiseResetToRejected(
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
}
reader._closedPromise.catch(() => {});

Expand Down Expand Up @@ -1099,7 +1175,7 @@ function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
ReadableStreamDefaultControllerError(controller, e);
}
)
.catch(rethrowAssertionErrorRejection);
.catch(rethrowAssertionErrorRejection);

return undefined;
}
Expand Down Expand Up @@ -1261,7 +1337,7 @@ function SetUpReadableStreamDefaultController(
ReadableStreamDefaultControllerError(controller, r);
}
)
.catch(rethrowAssertionErrorRejection);
.catch(rethrowAssertionErrorRejection);
}

function SetUpReadableStreamDefaultControllerFromUnderlyingSource(stream, underlyingSource, highWaterMark,
Expand Down Expand Up @@ -1534,7 +1610,7 @@ function ReadableByteStreamControllerCallPullIfNeeded(controller) {
ReadableByteStreamControllerError(controller, e);
}
)
.catch(rethrowAssertionErrorRejection);
.catch(rethrowAssertionErrorRejection);

return undefined;
}
Expand Down Expand Up @@ -1570,7 +1646,7 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto
assert(bytesFilled % elementSize === 0);

return new pullIntoDescriptor.ctor(
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
}

function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) {
Expand Down Expand Up @@ -1994,17 +2070,17 @@ function SetUpReadableByteStreamController(stream, controller, startAlgorithm, p

const startResult = startAlgorithm();
Promise.resolve(startResult).then(
() => {
controller._started = true;
() => {
controller._started = true;

assert(controller._pulling === false);
assert(controller._pullAgain === false);
assert(controller._pulling === false);
assert(controller._pullAgain === false);

ReadableByteStreamControllerCallPullIfNeeded(controller);
},
r => {
ReadableByteStreamControllerError(controller, r);
}
ReadableByteStreamControllerCallPullIfNeeded(controller);
},
r => {
ReadableByteStreamControllerError(controller, r);
}
)
.catch(rethrowAssertionErrorRejection);
}
Expand Down Expand Up @@ -2063,6 +2139,10 @@ function streamBrandCheckException(name) {
return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`);
}

function streamAsyncIteratorBrandCheckException(name) {
return new TypeError(`ReadableStreamAsyncIterator.${name} can only be used on a ReadableSteamAsyncIterator`);
}

// Helper functions for the readers.

function readerLockException(name) {
Expand Down
Loading