-
Notifications
You must be signed in to change notification settings - Fork 44
No means for secure stopping async generators #126
Comments
I'm not even sure what |
That is, that is not the directionality of async iterators - if |
regardless it is
Not eagerly, when first next called it awaits on And so, we cannot exit the loop from the caller, and the only problem here is the queue for const src = {...}
async function* h1() {
for await(const i of src) {
yield some(i)
}
}
async function* caller() {
for await (const i of merge(h1(), somethingElse)) {
if (someCondition)
break
}
} and the simplified transpilation of const src = {...}
async function* h1() {
const loop = src[Symbol.asyncIterator]()
try {
for(let item;!(item = await loop.next()).done;) {
yield some(item.value)
}
} finally {
await src.return()
}
}
async function* caller() {
const loop = merge(h1(), somethingElse)[Symbol.asyncIterator]()
try {
for(let item; !(item = await loop.next()).done;) {
if (someCondition)
break
}
} finally {
await loop.return()
}
} Say when the control encounters If we wouldn't have the queue the With the queue the |
I have the same issue! I want to write an iterator adapter that makes an async iterator cancelable. async function* iterateStream(stream) {
let ended = false, error, wake = () => {};
stream.on("readable", wake)
.on("end", () => { ended = true; wake(); })
.on("error", (err) => { error = err; wake(); });
for (;;) {
for (let chunk = stream.read(); chunk; chunk = stream.read()) {
yield chunk;
}
if (error) { throw error; }
if (ended) { return; }
await new Promise(resolve => wake = resolve);
}
}
async function* withTimeout(timeoutMs, iter) {
let timer;
try {
const timeout = new Promise((_, reject) => {
timer = setTimeout(reject, timeoutMs, new Error("Timeout!"));
});
for (;;) {
const result = await Promise.race([timeout, iter.next()]);
if (result.done) { break; }
yield result.value;
}
}
finally {
clearTimeout(timer);
if (iter.return) { await iter.return(); }
}
}
async function main() {
const socket = net.connect(80, "httpbin.org");
try {
for await (const val of withTimeout(1000, iterateStream(socket))) {
console.log("[main] got value %s", val);
}
}
finally {
socket.end();
}
} This also gets stuck in It seems to me that
If I understand all this correctly, this seems very unfortunate, since async generators otherwise would make implementing async iterators very easy! |
There are a couple of things going on here. First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a async function f() {
try {
throw new Error('oops');
} finally {
console.log('finally f');
// Never-resolving
await new Promise(() => {});
}
}
async function g() {
try {
await f();
} finally {
// This is never executed
console.log('finally g');
}
}
g(); The lesson here is: be careful about putting an The other thing I see is that we are assuming that As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because The DOM has invented |
Reminds me of tc39/proposal-async-await#51 when I pondered if finally in |
No, at least in my example, finally block is never executed and this is why it is a problem.
I see absolutely no problem in this example, someone could put never resolving promise in
No, I highlighted it in the message, Promise cancelation is not required if you mean that by async calculation.
Let's keep it practical, without resorting to philosophical terms interpretation. I might as well call it - clean generator object.
I don't get this. What is not linear? Everything is kept linear without the queue in the generator scope as well, outside the scope, everything is async as usual. I see no problem again. What is another channel? Another channel means no async generators and implementing async iterators protocol?
This won't work, there is no another channel, unless all async iterators may interpret the token themselves in their code. But that just means - no async generators. There is no way to handle any token if control is resting in await. Also @zenparsing in the first message, I actually copied your implementation of converting async generator to observable -
In my message, it is some possible transpilation result, not the finally actual usage, |
I think the queue as it currently exists should remain, but I still think it would be worth reviving some form of syntactic cancellation mechanism to make this sort've thing easier (what is one to do with I don't really see how the cancellation proposal could solve this issue in it's current form, unless it was decided that Even sending the cancellation token forward with function map(asyncIterable, iteratee) {
return {
async* [Symbol.asyncIterator]() {
const iterator = getIterator(asyncIterable)
try {
while (true) {
const cancelToken = function.sent;
const value = iterator.next(cancelToken);
yield iteratee(value);
}
} finally {
await iterator.return()
}
}
}
} This seems quite bad compared to the naive implementation of map that just loops through the items and yields |
but why? I mean, I've, of course, read the motivation like to make something running in parallel with While of course, something implementing Async Iterator protocol but not Generator could do this. But the thing may handle the order itself in whatever way it likes. It may be a queue, it may be a priority queue it may ignore something etc. It is also easy to write a wrapper to add a queue to any other Async Iterator, e.g. |
That's not true for arbitrary async iterators though |
I would love to see an implementation of combineLatest with the semantics proposed by @awto though - I looked at all my implementations from when I gave a talk and they suffer from this problem. I haven't had proper time to think about it - but I definitely think it's worth solving and the spec is worth amending if it can't be. |
indeed, but there is no any queue for arbitrary async iterators anyway, the iterator itself can decide how to handle the requests |
Yes but it's because other types of async iterators exist that generators need to queue. If a consumer consumes two values eagerly and then invokes e.g. One might implement a preloading version of function eagerTakeN(asyncIterable, n) {
async* [Symbol.asyncIterator]() {
const iterator = getIterator(asyncIterable);
// Preload values as fast as possible into buffers of a given size
const buffer = []
for (let i = 0 ; i < n ; i++) {
buffer.push(iterator.next())
}
// Allow the iterator to stop queuing now and cleanup when *it*
// wants to
const done = iterator.return();
const results = await Promise.all(buffer);
for (const { done, value } of buffer) {
if (done) {
return value;
}
yield value;
}
await done;
}
} Note that if you know the iterables are generators it makes no difference to consume multiple concurrently, but if it's an arbitrary iterable it might be better to consume earlier. If you want to cancel the |
@Jamesernator |
The thing is cancellation allows you to have both which I why I want to see a better cancellation proposal than just a builtin CancelToken type or the like. What you're proposing effectively requires that e.g. Consider this that needs to free a lock immediately after producing a value: async function* readFromDatabase() {
const cursor = db.getCursor('someTable');
while (true) {
// This await could be interrupted
const dbLock = await db.acquireLock();
const value = cursor.nextValue();
dbLock.unlock();
yield value;
}
} But from what I can tell under your proposed change if it were to be cancelled via |
fair point, I don't really mind much about the cancelation, just wanted something simpler. Still, the solution to this problem is possible, it can be some db context where we can signal the lock isn't longer required in |
actually, the cancelation proposal will indeed solve the problem faster than something can be changed in this spec, so I'm closing the issue. Thanks, everyone, esp. @Jamesernator. |
in case if someone needs this urgently, you can use my transpiler - @effectful/es (sorry for advertisement). In abstract interface mode, you can extend the interface with any functionality you want including cancelation or no queue or anything else. |
@awto what cancellation proposal? :D |
@benjamingr cancellation proposal - I suppose it is the thing to solve the problem somehow in some future, there are not many details now, and it will happen apparently not soon. For my task, I'm doing a workaround with my transpiler for now. |
there are libraries emerging with (not-safe) iterator's combinations, so I think it is rather worth keeping this issue open |
Maybe talk to the people from https://github.com/jamiemccrindle/axax @jamiemccrindle |
Does anyone have a simple test case that demonstrates this issue? |
@jamiemccrindle I would at least add a warning on your page, axax is still usable if the sources are managed, but as a user, I would like to be aware of possible problems, e.g. const s1 = new Subject()
const s2 = new Subject()
async function* something() {
try {
for await(const i of s2.iterator)
yield i
} finally {
console.log("never called")
}
}
async function test() {
for await(const i of merge(s1.iterator,something())) {
break
}
}
test()
s1.onNext(1) here is the demonstration of the leak with |
Thanks for the example @awto. I'll add a note to axax. |
Would you say that this is comparable to a Promise that never returns. Something like: async function example() {
await new Promise((resolve, reject) => {});
}
example.then(() => console.log('never gets here')); |
I don't think so, there is still a way to finish the chain, e.g. by calling |
To be clear, the proposal I’m saying is: |
I thought more on my earlier comment:
This solves |
I'm thinking about making a Babel plugin to wrap async generators and all |
I disagree with the idea that you have to reject async iterator promises in the case of |
@brainkim My apologies, you're right! I forgot that the iteration protocol allows the value passed to return to be yielded alongside Obviously you hadn't passed the balmer peak 😂 Happy holidays |
Dude, I'm still not exactly thinking straight. Control would still return to a generator when a standalone promise it's awaiting finishes. Only yield statements would prevent control from returning to the generator after return. So aside from terminating |
To clarify, we’re pointing out two additional problems here which are similar to, but not exactly the original issue:
async function *gen() {
try {
await myAsyncFn();
} finally {
myCleanupFn();
}
}
const g = gen();
g.next();
g.return(); In the above code, My proposed solution, should we consider this an actual problem, is to have the Changing the behavior to make
async function *gen() {
try {
yield new Promise(() => {});
} finally {
myCleanupFn();
}
}
const g = gen();
g.next();
g.return(1); In the above code, because calls to next/return/throw all settle in order, the call to My proposed solution, should we consider this an actual problem, is to have pending calls to Ultimately, these are both only issues if you have hanging promises, and like I said, if you have hanging promises you have bigger issues than that your cleanup code isn’t being called. The solutions I propose are only half-hearted, because I think it’s important for async iterators/generators to communicate hanging promises to the developer early by preventing further calls to Let me know if any of this is unclear and I will re-explain until it makes sense. |
If I understand correctly, this would cause unexpected behavior, for example: async function *gen() {
try {
const name = await getName();
await fs.writeFile(JSON.stringify(name).replace(/^"|"$/g, '') + '.txt');
} finally {
myCleanupFn();
}
} If the generator is resumed before It seems like you're hoping for a more elegant way to clean up than throwing, but there's really no surefire way to jump to the
|
What I’m saying is that if the generator was suspended at Again I‘m not really advocating for these solutions, more just proposing them to deny that promise cancellation is somehow “necessary.”
Yes exactly, which is why I believe the “problem” of |
You're right that hanging promises is a big problem...it's kind of scary considering that |
Yeah I'm starting to agree with you. The only reason I really got into this rabbit hole is that GraphQL subs use async iterators over endless event streams (which seems like a really questionable design decision by now); if I were only using async generators for things like mapping the lines of a file I would never even have to think about problems like this. |
Hmm if I’m understanding you right it seems like you got another situation where you’re doing something eagerly where you should be doing it lazily, namely, you’re turning each event into a promise eagerly before passing it along to the async iterator. You shouldn’t do anything like that; rather you should be passing the events to the iterator as they happen, so the async iterator can return immediately when there are no pending events. Ultimately, the issues with async iterators expressed here in this issue and elsewhere is that we’ve asked developers to hand-roll async queues themselves, and it’s about as unfair as asking them to implement promises from scratch whenever they need them, with lots of subtle pitfalls which can have disastrous effects in production. And then at the same time there’s an unrelenting stream of FUD from developers who would rather see observables or eigenmonads or whatever be adopted as the standard stream abstraction in js. Maybe if we unified around a common async iterator constructor cough repeaters cough we wouldn’t have as many problems? Like peek this: const socket = new WebSocket("ws://echo.websocket.org");
const messages = new Repeater(async (push, stop) => {
socket.onmessage = (ev) => push(ev.data);
socket.onerror = () => stop(new Error("WebSocket error"));
socket.onclose = () => stop();
await stop;
socket.close();
}); I guarantee you could replace every single ad-hoc async iterator class in your graphql dependencies with repeaters and you’d have cleaner, less buggy code. |
Just wanted to update this thread with a possible complication related to We can’t use async generators and Code example: async function *map(iterable, fn) {
for await (const value of iterable) {
yield fn(value);
}
}
(async () => {
const doubleIter = map(iterable, (i) => i * 2);
const p1 = doubleIter.next();
// only has an effect after the previous line fulfills
const p2 = doubleIter.return();
Promise.race([p2, p1]).then(console.log); // p1 will always win
}); Even though the The problem here is |
We've experienced some issues with async generators causing GraphQL subscription cleanup to never be called. We have solved it by replacing async generators functions with async iterators that are implemented using raw protocol. While this doesn't fully solve the issue of promises being uncancellable (and so unresolved promise might hang in memory forever), at very least this allowed cleanup code to be called when iterator.return() is called. I hope this will help to mitigate this issue for some people. See this code sandbox for illustration of difference in behaviour: https://codesandbox.io/s/async-generator-vs-iterator-returns-s4sht?file=/src/App.js |
Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.
For example, I use it for UI and convert DOM events into an async iterable with a function
subject
(I don't include its sources here, it is not significant for the problem - it may be checked in this gist). The function returns an object with AsyncIterable interface and additional methodsend
to pass the event into the stream.And there is a function
combine
simply merging all async iterables in arguments into a single one.And now I iterate two such sources combined and exit the loop on some condition.
Everything is pretty fine, I added
console.log
in exit function insubject
all loops to handle subjects exit. But now if I wrap one of the sources with any simple async generator, like with (or justyield*
)Now test never exits (never calls next
then
), it waits in await in the combine's finally block. The reason is pretty simple and obvious, thecopy
generator is blocked onnext
and generators use the same queue forreturn
.I searched issues in this repository and found these tickets: #5 and #55. There are no clean reasons for the single queue decision, except probably:
This is maybe better to be done this way:
And how would we convert async generator into observable? For example, the solution from #20 won't work.
The cleanup function won't stop the source generator (even if we add
return
call for object returned bydrain
) if there are no more values sent by original iterable, it is forever locked innext
.Current status
There is nothing to do in this proposal to fix this. This will be fixed automatically after some form of async cancelation operation is added as part of some another proposal. This issue is left open as a warning about the problem.
Workarounds
Using Transducers (lifted generators)
Transducers are functions taking a stream as input and producing another stream.
Generators can read its input passed as an argument and it is there we can send some specific
stop
signal if needed.See more detailed description here:
There is
fork
function which merges streams and stops well, but it merges lifted streams - transducersTranspiler
@effetful/es transpiler adds lightweight cancelation support to async function.
Resulting Promises of async functions transpiled by this are amended with
M.cancelSymbol
which is a function which tries to cancel execution of this function. It executesfinally
blocks, and propagates cancelation to currentawait
expression. It doesn't do anything more (e.g. no propagation to children).The text was updated successfully, but these errors were encountered: