From 6c966265528ddfa9f283719a3e08644e7abf235e Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Fri, 7 Sep 2018 13:32:53 -0500 Subject: [PATCH 1/7] add @@asyncIterator to ReadableStream --- index.bs | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/index.bs b/index.bs index 197615cb5..d608a16c5 100644 --- a/index.bs +++ b/index.bs @@ -403,6 +403,8 @@ like pipeThrough({ writable, readable }, options) pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) tee() + + [@@asyncIterator]({ preventCancel } = {}) } @@ -866,6 +868,84 @@ option. If type is set to unde + +
[@@asyncIterator]({ preventCancel = false } = {})
+ +
+ The @@asyncIterator method creates a default reader and locks the + stream to the new reader. While the stream is locked, no other reader can be acquired until this one is released. + + This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its + entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel + the stream, which would interfere with your abstraction. +
+ + + 1. If ! IsReadableStream(*this*) is *false*, throw a *TypeError* exception. + 1. Let _reader_ be ? AcquireReadableStreamDefaultReader(*this*). + 1. Let _iterator_ be ! ObjectCreate(`ReadableStreamAsyncIteratorPrototype`). + 1. Set _iterator_.[[asyncIteratorReader]] to _reader_. + 1. Set _iterator_.[[preventCancel]] to ! ToBoolean(_preventCancel_). + 1. Return _iterator_. + + +

ReadableStreamAsyncIteratorPrototype

+ +{{ReadableStreamAsyncIteratorPrototype}} is an ordinary object that is used by {{ReadableStream/[@@asyncIterator]()}} to +construct the objects it returns. Instances of {{ReadableStreamAsyncIteratorPrototype}} implmenet the {{AsyncIterator}} +abstract interface from the JavaScript specification. [[!ECMASCRIPT]] + +The {{ReadableStreamAsyncIteratorPrototype}} object must have its \[[Prototype]] internal slot set to +{{%AsyncIteratorPrototype%}}. + +

Internal slots

+Objects created by {{ReadableStream/[@@asyncIterator]()}}, using {{ReadableStreamAsyncIteratorPrototype}} as their +prototype, are created with the internal slots described in the following table: + + + + + + + + + + + +
Internal SlotDescription (non-normative)
\[[asyncIteratorReader]] + A {{ReadableStreamDefaultReader}} instance +
\[[preventCancel]] + A boolean value indicating if the stream will be canceled when the async iterator's {{ReadableStreamAsyncIteratorPrototype/return()}} method is called +
+ +

next()

+ + + 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, throw a *TypeError* exception. + 1. Let _reader_ be *this*.[[asyncIteratorReader]]. + 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. + 1. Return ! ReadableStreamDefaultReaderRead(_reader_, *true*). + + +

return( value )

+ + + 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, throw a *TypeError* exception. + 1. Let _reader_ be *this*.[[asyncIteratorReader]]. + 1. If *this*.[[preventCancel]] is *false*, then: + 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* + exception. + 1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_). + 1. Otherwise, + 1. Let _result_ be ! ReadableStreamCreateReadResult(_value_, *true*, *true*). + 1. Perform ! ReadableStreamReaderGenericRelease(_reader_). + 1. Return _result_. + +

General readable stream abstract operations

The following abstract operations, unlike most in this specification, are meant to be generally useful by other @@ -984,6 +1064,15 @@ readable stream is locked to a reader. 1. Return *true*. +

IsReadableStreamAsyncIterator ( x )

+ + + 1. If Type(_x_) is not Object, return *false*. + 1. If _x_ does not have a [[asyncIteratorReader]] internal slot, return *false*. + 1. Return *true*. + +

ReadableStreamTee ( stream, cloneForBranch2 )

From 7e7aaa19412ecee2c614f1fe72d1ed82990ffc2c Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Fri, 7 Sep 2018 14:11:06 -0500 Subject: [PATCH 2/7] squash! nits --- index.bs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/index.bs b/index.bs index d608a16c5..f96d889ab 100644 --- a/index.bs +++ b/index.bs @@ -868,17 +868,12 @@ option. If type is set to unde - -
[@@asyncIterator]({ preventCancel = false } = {})
+
getIterator({ preventCancel = false } = {})
- The @@asyncIterator method creates a default reader and locks the - stream to the new reader. While the stream is locked, no other reader can be acquired until this one is released. - - This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its - entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel - the stream, which would interfere with your abstraction. + The getIterator method returns an async iterator which can be used to consume the stream. The + {{ReadableStreamDefaultReaderAsyncIteratorPrototype/return()}} method of this iterator object will, by default, + cancel the stream; it will also release the reader.
@@ -890,11 +885,21 @@ option. If type is set to unde 1. Return _iterator_. + +
[@@asyncIterator]()
+ +

+ The @@asyncIterator method is an alias of {{ReadableStreamDefaultReader/getIterator()}}. +

+ +The initial value of the @@asyncIterator method is the same function object as the initial value of the +{{ReadableStream/getIterator()}} method. +

ReadableStreamAsyncIteratorPrototype

{{ReadableStreamAsyncIteratorPrototype}} is an ordinary object that is used by {{ReadableStream/[@@asyncIterator]()}} to -construct the objects it returns. Instances of {{ReadableStreamAsyncIteratorPrototype}} implmenet the {{AsyncIterator}} +construct the objects it returns. Instances of {{ReadableStreamAsyncIteratorPrototype}} implement the {{AsyncIterator}} abstract interface from the JavaScript specification. [[!ECMASCRIPT]] The {{ReadableStreamAsyncIteratorPrototype}} object must have its \[[Prototype]] internal slot set to @@ -941,9 +946,9 @@ for="ReadableStreamAsyncIteratorPrototype">return( value ) exception. 1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_). 1. Otherwise, - 1. Let _result_ be ! ReadableStreamCreateReadResult(_value_, *true*, *true*). + 1. Let _result_ be _value_. 1. Perform ! ReadableStreamReaderGenericRelease(_reader_). - 1. Return _result_. + 1. Return a promise resolved with ! ReadableStreamCreateReadResult(_result_, *true*, *true*).

General readable stream abstract operations

From b682fdc300e70a155ea85305fec92e991699c5fd Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Fri, 7 Sep 2018 14:13:16 -0500 Subject: [PATCH 3/7] squash! add cross-links --- index.bs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/index.bs b/index.bs index f96d889ab..bc133b6c7 100644 --- a/index.bs +++ b/index.bs @@ -18,6 +18,8 @@ spec:promises-guide; type:dfn;
 urlPrefix: https://tc39.github.io/ecma262/; spec: ECMASCRIPT
     text: %Uint8Array%; url: #sec-typedarray-objects; type: constructor
+    text: %AsyncIteratorPrototype%; url: #sec-asynciteratorprototype; type: interface
+    text: AsyncIterator; url: #sec-asynciterator-interface; type: interface
     text: ArrayBuffer; url: #sec-arraybuffer-objects; type: interface
     text: DataView; url: #sec-dataview-objects; type: interface
     text: Number; url: #sec-ecmascript-language-types-number-type; type: interface

From d08405fbb25f5ad8efd91ba19d1bd904ee6159ff Mon Sep 17 00:00:00 2001
From: Gus Caplan 
Date: Fri, 7 Sep 2018 14:45:32 -0500
Subject: [PATCH 4/7] squash! transform cancel promise

---
 index.bs | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/index.bs b/index.bs
index bc133b6c7..385fb3612 100644
--- a/index.bs
+++ b/index.bs
@@ -947,10 +947,11 @@ for="ReadableStreamAsyncIteratorPrototype">return( value )
     1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError*
     exception.
     1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_).
-  1. Otherwise,
-    1. Let _result_ be _value_.
+    1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
+    1. Return the result of transforming _result_ by a fulfillment handler that returns !
+    ReadableStreamCreateReadResult(_value_, *true*, *true*).
   1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
-  1. Return a promise resolved with ! ReadableStreamCreateReadResult(_result_, *true*, *true*).
+  1. Return a promise resolved with ! ReadableStreamCreateReadResult(_value_, *true*, *true*).
 
 
 

General readable stream abstract operations

From d84893a91c8d824674dea8a5eccf340a890e05cb Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 15 Sep 2018 11:40:08 -0500 Subject: [PATCH 5/7] squash! stuff --- index.bs | 7 +- reference-implementation/.eslintrc.json | 2 +- .../lib/readable-stream.js | 65 +++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/index.bs b/index.bs index 385fb3612..c5ae8c64d 100644 --- a/index.bs +++ b/index.bs @@ -931,7 +931,7 @@ prototype, are created with the internal slots described in the following table:

next()

- 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, throw a *TypeError* exception. + 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, a promise rejected with *TypeError* exception. 1. Let _reader_ be *this*.[[asyncIteratorReader]]. 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. 1. Return ! ReadableStreamDefaultReaderRead(_reader_, *true*). @@ -941,15 +941,18 @@ prototype, are created with the internal slots described in the following table: for="ReadableStreamAsyncIteratorPrototype">return( value ) - 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, throw a *TypeError* exception. + 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return a promise rejected with *TypeError* exception. 1. Let _reader_ be *this*.[[asyncIteratorReader]]. 1. If *this*.[[preventCancel]] is *false*, then: 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. + 1. If _reader_.[[readRequests]] is not empty, return a promise rejected with a *TypeError* exception. 1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_). 1. Perform ! ReadableStreamReaderGenericRelease(_reader_). 1. Return the result of transforming _result_ by a fulfillment handler that returns ! ReadableStreamCreateReadResult(_value_, *true*, *true*). + 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. + 1. If _reader_.[[readRequests]] is not empty, return a promise rejected with a *TypeError* exception. 1. Perform ! ReadableStreamReaderGenericRelease(_reader_). 1. Return a promise resolved with ! ReadableStreamCreateReadResult(_value_, *true*, *true*). diff --git a/reference-implementation/.eslintrc.json b/reference-implementation/.eslintrc.json index 42f6adb83..6b5123511 100644 --- a/reference-implementation/.eslintrc.json +++ b/reference-implementation/.eslintrc.json @@ -5,7 +5,7 @@ "es6": true }, "parserOptions": { - "ecmaVersion": 6 + "ecmaVersion": 2018 }, "globals": { "ReadableStream": false, diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 435793d0a..fe2190d9f 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -267,8 +267,61 @@ class ReadableStream { const branches = ReadableStreamTee(this, false); return createArrayFromList(branches); } + + getIterator({ preventCancel = false } = {}) { + if (IsReadableStream(this) === false) { + throw streamBrandCheckException('getIterator'); + } + const reader = AcquireReadableStreamDefaultReader(this); + const iterator = Object.create(ReadableStreamAsyncIteratorPrototype); + iterator._asyncIteratorReader = reader; + iterator._preventCancel = Boolean(preventCancel); + return iterator; + } } +const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype); +const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ + next() { + if (IsReadableStreamAsyncIterator(this) === false) { + return Promise.reject(new TypeError()); + } + const reader = this._asyncIteratorReader; + if (reader._ownerReadableStream === undefined) { + return Promise.reject(new TypeError()); + } + return ReadableStreamDefaultReaderRead(reader, true); + }, + + return(value) { + if (IsReadableStreamAsyncIterator(this) === false) { + return Promise.reject(new TypeError()); + } + const reader = this._asyncIteratorReader; + if (this.preventCancel === false) { + if (reader._ownerReadableStream === undefined) { + return Promise.reject(new TypeError()); + } + if (reader._readRequests.length > 0) { + return Promise.reject(new TypeError()); + } + const result = ReadableStreamReaderGenericCancel(reader, value); + ReadableStreamReaderGenericRelease(reader); + return result.then(() => ReadableStreamCreateReadResult(value, true, true)); + } + if (reader._ownerReadableStream === undefined) { + return Promise.reject(new TypeError()); + } + if (reader._readRequests.length > 0) { + return Promise.reject(new TypeError()); + } + ReadableStreamReaderGenericRelease(reader); + return Promise.resolve(ReadableStreamCreateReadResult(value, true, true)); + } +}, AsyncIteratorPrototype); + +ReadableStream.prototype[Symbol.asyncIterator] = ReadableStream.prototype.getIterator; + module.exports = { CreateReadableByteStream, CreateReadableStream, @@ -364,6 +417,18 @@ function IsReadableStreamLocked(stream) { return true; } +function IsReadableStreamAsyncIterator(x) { + if (!typeIsObject(x)) { + return false; + } + + if (!Object.prototype.hasOwnProperty.call(x, '_asyncIteratorReader')) { + return false; + } + + return true; +} + function ReadableStreamTee(stream, cloneForBranch2) { assert(IsReadableStream(stream) === true); assert(typeof cloneForBranch2 === 'boolean'); From 8f37db082edcb17e8143e16a8a4e5f2e784fba57 Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sun, 16 Sep 2018 10:23:35 -0500 Subject: [PATCH 6/7] squash! more --- index.bs | 2 +- .../lib/readable-stream.js | 20 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/index.bs b/index.bs index c5ae8c64d..a9955ffe2 100644 --- a/index.bs +++ b/index.bs @@ -931,7 +931,7 @@ prototype, are created with the internal slots described in the following table:

next()

- 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, a promise rejected with *TypeError* exception. + 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return a promise rejected with *TypeError* exception. 1. Let _reader_ be *this*.[[asyncIteratorReader]]. 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. 1. Return ! ReadableStreamDefaultReaderRead(_reader_, *true*). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index fe2190d9f..e23e3fb32 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -284,36 +284,38 @@ const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ next() { if (IsReadableStreamAsyncIterator(this) === false) { - return Promise.reject(new TypeError()); + return Promise.reject(streamAsyncIteratorBrandCheckException('next')); } const reader = this._asyncIteratorReader; if (reader._ownerReadableStream === undefined) { - return Promise.reject(new TypeError()); + return Promise.reject(readerLockException('next')); } return ReadableStreamDefaultReaderRead(reader, true); }, return(value) { if (IsReadableStreamAsyncIterator(this) === false) { - return Promise.reject(new TypeError()); + return Promise.reject(streamAsyncIteratorBrandCheckException('next')); } const reader = this._asyncIteratorReader; if (this.preventCancel === false) { if (reader._ownerReadableStream === undefined) { - return Promise.reject(new TypeError()); + return Promise.reject(readerLockException('next')); } if (reader._readRequests.length > 0) { - return Promise.reject(new TypeError()); + return Promise.reject(new TypeError( + 'Tried to release a reader lock when that reader has pending read() calls un-settled')); } const result = ReadableStreamReaderGenericCancel(reader, value); ReadableStreamReaderGenericRelease(reader); return result.then(() => ReadableStreamCreateReadResult(value, true, true)); } if (reader._ownerReadableStream === undefined) { - return Promise.reject(new TypeError()); + return Promise.reject(readerLockException('next')); } if (reader._readRequests.length > 0) { - return Promise.reject(new TypeError()); + return Promise.reject(new TypeError( + 'Tried to release a reader lock when that reader has pending read() calls un-settled')); } ReadableStreamReaderGenericRelease(reader); return Promise.resolve(ReadableStreamCreateReadResult(value, true, true)); @@ -2021,6 +2023,10 @@ function streamBrandCheckException(name) { return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`); } +function streamAsyncIteratorBrandCheckException(name) { + return new TypeError(`ReadableStreamAsyncIterator.${name} can only be used on a ReadableSteamAsyncIterator`); +} + // Helper functions for the readers. function readerLockException(name) { From 291da255a58c9ac22e2695de450bab232d5a5210 Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Tue, 18 Sep 2018 10:51:42 -0500 Subject: [PATCH 7/7] squash! more --- index.bs | 7 ++----- .../lib/readable-stream.js | 21 +++++++------------ 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/index.bs b/index.bs index a9955ffe2..710d2290c 100644 --- a/index.bs +++ b/index.bs @@ -943,16 +943,13 @@ for="ReadableStreamAsyncIteratorPrototype">return( value ) 1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return a promise rejected with *TypeError* exception. 1. Let _reader_ be *this*.[[asyncIteratorReader]]. + 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. + 1. If _reader_.[[readRequests]] is not empty, return a promise rejected with a *TypeError* exception. 1. If *this*.[[preventCancel]] is *false*, then: - 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* - exception. - 1. If _reader_.[[readRequests]] is not empty, return a promise rejected with a *TypeError* exception. 1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_). 1. Perform ! ReadableStreamReaderGenericRelease(_reader_). 1. Return the result of transforming _result_ by a fulfillment handler that returns ! ReadableStreamCreateReadResult(_value_, *true*, *true*). - 1. If _reader_.[[ownerReadableStream]] is *undefined*, return a promise rejected with a *TypeError* exception. - 1. If _reader_.[[readRequests]] is not empty, return a promise rejected with a *TypeError* exception. 1. Perform ! ReadableStreamReaderGenericRelease(_reader_). 1. Return a promise resolved with ! ReadableStreamCreateReadResult(_value_, *true*, *true*). diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index e23e3fb32..22f90b584 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -288,7 +288,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ } const reader = this._asyncIteratorReader; if (reader._ownerReadableStream === undefined) { - return Promise.reject(readerLockException('next')); + return Promise.reject(readerLockException('iterate')); } return ReadableStreamDefaultReaderRead(reader, true); }, @@ -298,25 +298,18 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ return Promise.reject(streamAsyncIteratorBrandCheckException('next')); } const reader = this._asyncIteratorReader; - if (this.preventCancel === false) { - if (reader._ownerReadableStream === undefined) { - return Promise.reject(readerLockException('next')); - } - if (reader._readRequests.length > 0) { - return Promise.reject(new TypeError( - 'Tried to release a reader lock when that reader has pending read() calls un-settled')); - } - const result = ReadableStreamReaderGenericCancel(reader, value); - ReadableStreamReaderGenericRelease(reader); - return result.then(() => ReadableStreamCreateReadResult(value, true, true)); - } if (reader._ownerReadableStream === undefined) { - return Promise.reject(readerLockException('next')); + return Promise.reject(readerLockException('finish iterating')); } if (reader._readRequests.length > 0) { return Promise.reject(new TypeError( 'Tried to release a reader lock when that reader has pending read() calls un-settled')); } + if (this._preventCancel === false) { + const result = ReadableStreamReaderGenericCancel(reader, value); + ReadableStreamReaderGenericRelease(reader); + return result.then(() => ReadableStreamCreateReadResult(value, true, true)); + } ReadableStreamReaderGenericRelease(reader); return Promise.resolve(ReadableStreamCreateReadResult(value, true, true)); }