-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert back to internal APIs to implement tee(), pipeTo(), async iterator and TransformStream #122
Conversation
e5264fc
to
8bdccbe
Compare
Hi @MattiasBuelens – in absence of your proposed EDIT: should this work? const body = response.body;
if (!body[Symbol.asyncIterator]) {
body[Symbol.asyncIterator] = async function* () {
const reader = (body as any as ReadableStream).getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
};
} |
Maybe this will do: ReadableStream.prototype[Symbol.asyncIterator] ??= function () {
const reader = this.getReader()
return {
next () {
return reader.read()
},
return () {
return reader.releaseLock()
},
throw (err) {
this.return()
throw err
},
[Symbol.asyncIterator] () {
return this
}
}
} |
Thank you @jimmywarting ! That looks great. Is it important to release the readers' locks if the stream is canceled as well? (asking because I saw that here: https://gist.github.com/dy/8ca154c96e2b2a823c6501d29972b8a8) |
No, that's not necessary. While the async iterator has the reader lock, you can't call Note that the Streams standard by default will cancel the stream when returning from the async iterator. If you want to match that, you can add an Also, you don't need the |
Amazing, thank you both so much. For posterity, here's what we ended up with: function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) {
return stream;
}
const reader = stream.getReader();
return {
next() {
return reader.read();
},
async return() {
reader.cancel();
reader.releaseLock();
return { done: true, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
}
async function usage() {
const response = await fetch(…);
const iter = readableStreamAsyncIterable<Bytes>(response.body);
for await (const chunk of iter) {
// …
}
} (We decided not to polyfill/monkeypatch in our scenario, but I imagine |
I wonder if it might be worth exposing a polyfill that just does this? Would you like a PR? (context in case you're curious: openai/openai-node#182 (comment)) |
The spec/MDN docs make it seem like you'd need to Reading the spec more carefully, |
The intended order is And while If you want to be as close as possible to the specced behavior, you can use this: function readableStreamAsyncIterable(stream, { preventCancel = false }) {
if (stream[Symbol.asyncIterator]) {
return stream[Symbol.asyncIterator]();
}
const reader = stream.getReader();
return {
async next() {
try {
const result = await reader.read();
if (result.done) reader.releaseLock(); // release lock when stream becomes closed
return result;
} catch (e) {
reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
},
async return(arg) {
if (!preventCancel) {
const cancelPromise = reader.cancel(arg); // cancel first, but don't await yet
reader.releaseLock(); // release lock first
await cancelPromise; // now await it
} else {
reader.releaseLock();
}
return { done: true, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
} |
Thank you so much @MattiasBuelens ! After spending some time with the spec, I agree that looks closer to what it seems to intend (there are some nonsensical things it asks for instead, like returning the cancel promise from I wonder if this is worth making available as its own polyfill? BTW, note that I had an error at the start, it should be: if (stream[Symbol.asyncIterator]) {
- return stream[Symbol.asyncIterator];
+ return stream;
} |
I have discontinued the See #1 (comment) for more information. |
In #98, #99, #100 and #105, I rewrote large parts of the polyfill to use the public
getReader()
andgetWriter()
APIs for most methods and classes, rather than relying on internal APIs such asAcquireReadableStreamDefaultReader
. The goal was to allow the polyfill to interoperate directly with native stream objects, such as aReadableStream
fromResponse.body
.Unfortunately, I now believe this is not going to work:
getReader()
through the public API, it still expects a reader that complies with the same version of the streams specification that the polyfill itself implements. However, if the specification makes changes to the reader's behavior (such as in Reject pending reads when releasing reader whatwg/streams#1168), then the polyfill has no way of knowing whether a given native reader implements this change or not (without resorting to browser sniffing).Instead, I think a better interoperability story would be to provide dedicated conversion methods, such as
ReadableStream.from(nativeStream)
. Users would need to manually convert streams "on the boundary", but at least they can expect predictable results afterwards from those converted streams.This PR reverts #98, #99, #100 and #105. We revert to (pretty much) the state of f43be79, see f43be79...8bdccbe.