Skip to content

Commit

Permalink
Add ReadableStream.from(asyncIterable)
Browse files Browse the repository at this point in the history
This static method takes an async iterable and returns a ReadableStream pulling chunks from that async iterable. Sync iterables (including arrays and generators) are also supported, since GetIterator() already has all the necessary handling to adapt a sync iterator into an async iterator.

Closes #1018.
  • Loading branch information
MattiasBuelens authored Jun 8, 2023
1 parent 058f290 commit 8d7a0bf
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 3 deletions.
58 changes: 58 additions & 0 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ urlPrefix: https://tc39.es/ecma262/; spec: ECMASCRIPT
text: Number type; url: #sec-ecmascript-language-types-number-type
text: Data Block; url: #sec-data-blocks
type: abstract-op
text: Call; url: #sec-call
text: CloneArrayBuffer; url: #sec-clonearraybuffer
text: CopyDataBlockBytes; url: #sec-copydatablockbytes
text: CreateArrayFromList; url: #sec-createarrayfromlist
Expand All @@ -53,9 +54,14 @@ urlPrefix: https://tc39.es/ecma262/; spec: ECMASCRIPT
text: Construct; url: #sec-construct
text: DetachArrayBuffer; url: #sec-detacharraybuffer
text: Get; url: #sec-get-o-p
text: GetIterator; url: #sec-getiterator
text: GetMethod; url: #sec-getmethod
text: GetV; url: #sec-getv
text: IsDetachedBuffer; url: #sec-isdetachedbuffer
text: IsInteger; url: #sec-isinteger
text: IteratorComplete; url: #sec-iteratorcomplete
text: IteratorNext; url: #sec-iteratornext
text: IteratorValue; url: #sec-iteratorvalue
text: OrdinaryObjectCreate; url: #sec-ordinaryobjectcreate
text: SameValue; url: #sec-samevalue
text: Type; url: #sec-ecmascript-data-types-and-values
Expand Down Expand Up @@ -478,6 +484,8 @@ The Web IDL definition for the {{ReadableStream}} class is given as follows:
interface ReadableStream {
constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});

static ReadableStream from(any asyncIterable);

readonly attribute boolean locked;

Promise<undefined> cancel(optional any reason);
Expand Down Expand Up @@ -808,6 +816,13 @@ option. If {{UnderlyingSource/type}} is set to undefined (including via omission
|underlyingSource|, |underlyingSourceDict|, |highWaterMark|, |sizeAlgorithm|).
</div>

<div algorithm>
The static <dfn id="rs-from" method for="ReadableStream">from(|asyncIterable|)</dfn> method steps
are:

1. Return ? [$ReadableStreamFromIterable$](|asyncIterable|).
</div>

<div algorithm>
The <dfn id="rs-locked" attribute for="ReadableStream">locked</dfn> getter steps are:

Expand Down Expand Up @@ -2095,6 +2110,49 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
1. Return true.
</div>

<div algorithm>
<dfn abstract-op lt="ReadableStreamFromIterable" id="readable-stream-from-iterable">
ReadableStreamFromIterable(|asyncIterable|)</dfn> performs the following steps:

1. Let |stream| be undefined.
1. Let |iteratorRecord| be ? [$GetIterator$](|asyncIterable|, async).
1. Let |startAlgorithm| be an algorithm that returns undefined.
1. Let |pullAlgorithm| be the following steps:
1. Let |nextResult| be [$IteratorNext$](|iteratorRecord|).
1. If |nextResult| is an abrupt completion, return [=a promise rejected with=]
|nextResult|.\[[Value]].
1. Let |nextPromise| be [=a promise resolved with=] |nextResult|.\[[Value]].
1. Return the result of [=reacting=] to |nextPromise| with the following fulfillment steps,
given |iterResult|:
1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
1. Let |done| be ? [$IteratorComplete$](|iterResult|).
1. If |done| is true:
1. Perform ! [$ReadableStreamDefaultControllerClose$](|stream|.[=ReadableStream/[[controller]]=]).
1. Otherwise:
1. Let |value| be ? [$IteratorValue$](|iterResult|).
1. Perform ! [$ReadableStreamDefaultControllerEnqueue$](|stream|.[=ReadableStream/[[controller]]=],
|value|).
<!-- TODO (future): If we allow changing the queuing strategy, this Enqueue might throw.
We'll then need to catch the error and close the async iterator. -->
1. Let |cancelAlgorithm| be the following steps, given |reason|:
1. Let |iterator| be |iteratorRecord|.\[[Iterator]].
1. Let |returnMethod| be [$GetMethod$](|iterator|, "`return`").
1. If |returnMethod| is an abrupt completion, return [=a promise rejected with=]
|returnMethod|.\[[Value]].
1. If |returnMethod|.\[[Value]] is undefined, return [=a promise resolved with=] undefined.
1. Let |returnResult| be [$Call$](|returnMethod|.\[[Value]], |iterator|, « |reason| »).
1. If |returnResult| is an abrupt completion, return [=a promise rejected with=]
|returnResult|.\[[Value]].
1. Let |returnPromise| be [=a promise resolved with=] |returnResult|.\[[Value]].
1. Return the result of [=reacting=] to |returnPromise| with the following fulfillment steps,
given |iterResult|:
1. If [$Type$](|iterResult|) is not Object, throw a {{TypeError}}.
1. Return undefined.
1. Set |stream| to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|,
0).
1. Return |stream|.
</div>

<div algorithm="ReadableStreamPipeTo">
<dfn abstract-op lt="ReadableStreamPipeTo"
id="readable-stream-pipe-to">ReadableStreamPipeTo(|source|, |dest|, |preventClose|, |preventAbort|,
Expand Down
4 changes: 4 additions & 0 deletions reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ exports.implementation = class ReadableStreamImpl {
aos.ReadableStreamDefaultReaderRelease(reader);
return promiseResolvedWith(undefined);
}

static from(asyncIterable) {
return aos.ReadableStreamFromIterable(asyncIterable);
}
};

// See pipeTo()/pipeThrough() for why this is needed.
Expand Down
2 changes: 2 additions & 0 deletions reference-implementation/lib/ReadableStream.webidl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
interface ReadableStream {
constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});

static ReadableStream from(any asyncIterable);

readonly attribute boolean locked;

Promise<undefined> cancel(optional any reason);
Expand Down
83 changes: 83 additions & 0 deletions reference-implementation/lib/abstract-ops/ecmascript.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ const assert = require('assert');

const isFakeDetached = Symbol('is "detached" for our purposes');

exports.typeIsObject = x => (typeof x === 'object' && x !== null) || typeof x === 'function';

exports.CreateArrayFromList = elements => {
// We use arrays to represent lists, so this is basically a no-op.
// Do a slice though just in case we happen to depend on the unique-ness.
Expand Down Expand Up @@ -39,3 +41,84 @@ exports.CanTransferArrayBuffer = O => {
exports.IsDetachedBuffer = O => {
return isFakeDetached in O;
};

exports.Call = (F, V, args = []) => {
if (typeof F !== 'function') {
throw new TypeError('Argument is not a function');
}

return Reflect.apply(F, V, args);
};

exports.GetMethod = (V, P) => {
const func = V[P];
if (func === undefined || func === null) {
return undefined;
}
if (typeof func !== 'function') {
throw new TypeError(`${P} is not a function`);
}
return func;
};

exports.CreateAsyncFromSyncIterator = syncIteratorRecord => {
// Instead of re-implementing CreateAsyncFromSyncIterator and %AsyncFromSyncIteratorPrototype%,
// we use yield* inside an async generator function to achieve the same result.

// Wrap the sync iterator inside a sync iterable, so we can use it with yield*.
const syncIterable = {
[Symbol.iterator]: () => syncIteratorRecord.iterator
};
// Create an async generator function and immediately invoke it.
const asyncIterator = (async function* () {
return yield* syncIterable;
}());
// Return as an async iterator record.
const nextMethod = asyncIterator.next;
return { iterator: asyncIterator, nextMethod, done: false };
};

exports.GetIterator = (obj, hint = 'sync', method) => {
assert(hint === 'sync' || hint === 'async');
if (method === undefined) {
if (hint === 'async') {
method = exports.GetMethod(obj, Symbol.asyncIterator);
if (method === undefined) {
const syncMethod = exports.GetMethod(obj, Symbol.iterator);
const syncIteratorRecord = exports.GetIterator(obj, 'sync', syncMethod);
return exports.CreateAsyncFromSyncIterator(syncIteratorRecord);
}
} else {
method = exports.GetMethod(obj, Symbol.iterator);
}
}
const iterator = exports.Call(method, obj);
if (!exports.typeIsObject(iterator)) {
throw new TypeError('The iterator method must return an object');
}
const nextMethod = iterator.next;
return { iterator, nextMethod, done: false };
};

exports.IteratorNext = (iteratorRecord, value) => {
let result;
if (value === undefined) {
result = exports.Call(iteratorRecord.nextMethod, iteratorRecord.iterator);
} else {
result = exports.Call(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
}
if (!exports.typeIsObject(result)) {
throw new TypeError('The iterator.next() method must return an object');
}
return result;
};

exports.IteratorComplete = iterResult => {
assert(exports.typeIsObject(iterResult));
return Boolean(iterResult.done);
};

exports.IteratorValue = iterResult => {
assert(exports.typeIsObject(iterResult));
return iterResult.value;
};
63 changes: 61 additions & 2 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const assert = require('assert');
const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, rejectPromise, uponPromise,
setPromiseIsHandledToTrue, waitForAllPromise, transformPromiseWith, uponFulfillment, uponRejection } =
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CanTransferArrayBuffer, Call, CopyDataBlockBytes, CreateArrayFromList, GetIterator, GetMethod, IsDetachedBuffer,
IteratorComplete, IteratorNext, IteratorValue, TransferArrayBuffer, typeIsObject } = require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
Expand Down Expand Up @@ -55,6 +55,7 @@ Object.assign(exports, {
ReadableStreamDefaultControllerHasBackpressure,
ReadableStreamDefaultReaderRead,
ReadableStreamDefaultReaderRelease,
ReadableStreamFromIterable,
ReadableStreamGetNumReadRequests,
ReadableStreamHasDefaultReader,
ReadableStreamPipeTo,
Expand Down Expand Up @@ -1879,3 +1880,61 @@ function SetUpReadableByteStreamControllerFromUnderlyingSource(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize
);
}

function ReadableStreamFromIterable(asyncIterable) {
let stream;
const iteratorRecord = GetIterator(asyncIterable, 'async');

const startAlgorithm = () => undefined;

function pullAlgorithm() {
let nextResult;
try {
nextResult = IteratorNext(iteratorRecord);
} catch (e) {
return promiseRejectedWith(e);
}
const nextPromise = promiseResolvedWith(nextResult);
return transformPromiseWith(nextPromise, iterResult => {
if (!typeIsObject(iterResult)) {
throw new TypeError('The promise returned by the iterator.next() method must fulfill with an object');
}
const done = IteratorComplete(iterResult);
if (done === true) {
ReadableStreamDefaultControllerClose(stream._controller);
} else {
const value = IteratorValue(iterResult);
ReadableStreamDefaultControllerEnqueue(stream._controller, value);
}
});
}

function cancelAlgorithm(reason) {
const iterator = iteratorRecord.iterator;
let returnMethod;
try {
returnMethod = GetMethod(iterator, 'return');
} catch (e) {
return promiseRejectedWith(e);
}
if (returnMethod === undefined) {
return promiseResolvedWith(undefined);
}
let returnResult;
try {
returnResult = Call(returnMethod, iterator, [reason]);
} catch (e) {
return promiseRejectedWith(e);
}
const returnPromise = promiseResolvedWith(returnResult);
return transformPromiseWith(returnPromise, iterResult => {
if (!typeIsObject(iterResult)) {
throw new TypeError('The promise returned by the iterator.return() method must fulfill with an object');
}
return undefined;
});
}

stream = CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0);
return stream;
}

0 comments on commit 8d7a0bf

Please sign in to comment.