-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Not that great for implementing transducers? #48
Comments
|
@jedwards1211 have you tried awaiting the const repeaterMap = (iterable, iteratee) =>
new Repeater(async (push, stop) => {
for await (const value of iterable) await push(iteratee(value))
await stop
}) I write about what the promise returned from push does in the overview:
|
Also some additional thoughts:
async function *map(iterable, iteratee) {
for await (const value of iterable) yield iteratee(value)
} I also don’t export any combinator methods directly on the async iterator interface, cause this would widen the surface API, most combinators are expressible using async generators, and the iterator helper proposal would obsolete/clash with any instance methods I defined on the repeater class.
|
Using an async generator suffers the secure stop issue though, and is especially a problem with GraphQL subs at the moment. I ended up using a raw iterator for map cases to solve the memory leaks I was having, because it's the only way I can ensure I didn't realize I can await |
Cool so this works well: const repeaterMap = (iterable, iteratee) =>
new Repeater(async (push, stop) => {
let stopped = false
stop.then(() => stopped = true)
for await (const value of iterable) {
if (stopped) break
await push(iteratee(value))
}
await stop
}) I didn't make it very clear but even though this was a toy example that doesn't need cleanup, I was trying to test whether cleanup would occur based upon either the input being
|
I think you said it best when you said:
tc39/proposal-async-iteration#126 (comment) Unless I’m misunderstanding your secure stop problem, repeaters wouldn’t help you here anyways because the repeater executor won’t execute until the first time next is called, so there‘s always the possibility that even when wrapping your iterator in a repeaterMap call, you still never reach the Lemme know if you have any other questions or concerns or if anything is unclear. |
Here's some test code I wrote to illustrate the secure stop problem you're talking about when using my revised The Theoretically I could implement map with a Repeater in such a way that it can pass Codeconst { RedisPubSub } = require('graphql-redis-subscriptions')
const { Repeater } = require('@repeaterjs/repeater')
const debug = require('debug')('pubsub')
class LoggingPubSub extends RedisPubSub {
publish(topic, ...args) {
debug(' publish', topic)
return super.publish(topic, ...args)
}
subscribe(trigger, ...args) {
debug(' subscribe', trigger)
return super.subscribe(trigger, ...args)
}
unsubscribe(subId) {
const [triggerName = null] = this.subscriptionMap[subId] || []
debug(' unsubscribe', triggerName)
return super.unsubscribe(subId)
}
}
const pubsub = new LoggingPubSub()
const repeaterMap = (iterable, iteratee) =>
new Repeater(async (push, stop) => {
let stopped = false
stop.then(() => (stopped = true))
for await (const value of iterable) {
if (stopped) break
await push(iteratee(value))
}
await stop
})
const rawMap = (iterable, iteratee) => ({
iterator: iterable[Symbol.asyncIterator](),
next() {
return this.iterator
.next()
.then(i => (i.done ? i : { value: iteratee(i.value), done: i.done }))
},
return() {
return this.iterator.return()
},
throw(error) {
return this.iterator.throw(error)
},
[Symbol.asyncIterator]() {
return this
},
})
async function subscribe(iterable, time) {
const iterator = iterable[Symbol.asyncIterator]()
async function iterate() {
iterator.next().then(({ value, done }) => {
if (!done) iterate()
})
}
iterate()
await new Promise(resolve =>
setTimeout(() => {
debug(' calling return()')
iterator.return()
resolve()
}, time)
)
}
async function go() {
let i = 0
setInterval(() => pubsub.publish('foo', i++), 10000)
console.log('without async generator')
await subscribe(pubsub.asyncIterator('foo'), 2000)
console.log('\nwith raw map')
await subscribe(rawMap(pubsub.asyncIterator('foo'), i => i * 2), 2000)
console.log('\nwith repeaterMap')
await subscribe(repeaterMap(pubsub.asyncIterator('foo'), i => i * 2), 2000)
}
go() Output
|
Hmmmm so I read through the code and went on a walk to think about it and here’s my interpretation: The problem isn’t repeaters or async generators but In addition, however, this partly works because the raw map async iterator has potentially surprising behavior where calls to (async () => {
const iter = rawMap(sourceIter, (i) => i * 2);
const nextP = iter.next();
const returnP = iter.return();
console.log(await Promise.race([returnP, nextP]));
})(); the race might resolve to If my reading of the code is correct, import { RedisPubSub } from 'graphql-redis-subscriptions';
const pubsub = new RedisPubSub();
const iter = pubsub.asyncIterator("SOMETHING_CHANGED");
const p1 = iter.next();
const p2 = iter.return("I WON");
Promise.race([p2, p1]).then((value) => console.log(value));
// {value: "I WON", done: true} If the above code doesn’t print what I expect then I’ve misunderstood the graphql-redis-subscriptions code (sorry I don’t feel like spinning up a redis instance right now to test 😛). |
I would probably go inform the async-iterator-helper peeps of this possible complication with using |
Even if you call |
So you're right that that code could probably be improved, but in |
Ahh I see
To be clear, I wasn’t criticizing the code, I don’t think it’s of poor quality, and it seems pretty readable as far as async code usually goes. My secret wish is that someone takes the repeater codebase and rewrites it, keeping only the unit tests, because I feel like the code has gotten pretty hairy to handle a bunch of important edge-cases. |
Here’s how you could implement a map combinator with concurrent returns with repeaters: function map(iterable, fn) {
return new Repeater(async (push, stop) => {
const iter = iterable[Symbol.asyncIterator]();
let finalIteration;
stop.then(() => {
finalIteration = finalIteration || iter.return();
});
while (!finalIteration) {
const iteration = await iter.next();
if (iteration.done) {
finalIteration = finalIteration || iteration;
break;
}
await push(fn(iteration.value));
}
// TODO: only await finalIteration if it’s a promise
finalIteration = await finalIteration;
return finalIteration.value;
});
} Probably can be cleaned up a bit, but as you can see, repeaters are still valuable here somewhat. |
Awaiting |
The main other thing about doing that with a repeater is that it seems like a lot more overhead than the raw async iterator. If |
It only really matters if you’re the kind of person who counts microticks. Actually we can clean it up a little like so: function map(iterable, fn) {
return new Repeater(async (push, stop) => {
const iter = iterable[Symbol.asyncIterator]();
let finalIteration;
stop.then(() => {
finalIteration = typeof iter.return === "function" ? iter.return() : {done: true};
});
while (!finalIteration) {
const iteration = await iter.next();
if (iteration.done) {
stop();
return iteration.value;
}
await push(fn(iteration.value));
}
// there’s no need to return finalIteration’s value here because when repeaters are returned, the return value will just be whatever was passed into the `return` method.
await finalIteration;
});
}
I completely respect this decision. However, here’s some things you get for free when you opt into repeaters:
These are all really hard things to implement yourself with a raw async iterator. |
I can keep tweaking the code snippet I have above to handle more situations, like maybe you want to race iter.next with the final iteration or whatever, and maybe you want to be able to pass an arg to the mapped iterator’s next method, and maybe you want to be able to throw an error into the inner iterator, just don’t have the time to edit it right now. If you paste what you end up with I can take a look and provide feedback. |
Really the only other thing I can imagine needing to do in the near future is yield an initial value before going on to the Redis events. Probably the cleanest way to do that would be to push the initial value and then pass through events from the non-async iterator API in Since |
My main takeaway from this issue:
If you need any help doing this, feel free to ping me, email me or whatever I’d be happy to help. Part of me doesn’t want to shoulder the responsibility of trying to make graphql subscriptions more reliable or scalable but after looking through the codebases I think they could definitely benefit from using something more repeater-like. If you need help convincing people or need some changes to the repeater codebase, let me know. |
Well, it really depends on what you're dealing with, if your input async iterable is just the lines of a file then using an async generator for |
Hi! Struggling to learn about async programming, async iterators, repeaters, alternatives, etc, as I try to implement I will have to merge async iterables, map them and filter them, etc, so a bunch of implemented functionality, functionality explained above, and possibly v4.0 functionality seems pretty appealing. In consideration of taking the plunge with repeaters, I hit upon the following statement in the discussion above that seemed like an important point:
Maybe you might have time to be able to elaborate on how raw async iterators do not achieve this and repeaters do? And why this is desirable? Is this statement meant only with respect to next() and return() or even with different calls to next() --Thanks in advance from an async noob |
Also -- i have to map, merge, filter, etc async iterables that may hang, i.e... I am planning on using the Repeater.race/timeout example to make sure that the for...await... loop does not hang forever. Some additional questions about consumption of Repeaters: (1) Is the for...await...loop "safe" to consume async iteables, bottom line, given all of the above? I think the answer to the "yes" or "yes, as long as you don't care that cleanup might happen after the next call to next". (2) If I am using Repeater.race because I think the promise might never settle, does using Repeater.race mean I don't have to worry about memory issues related to hanging promises? I think the answer is "no" there may still be memory issues, but I am stumped what to do about that besides to complain to whomever passed me the subscriber function that THEY didn't implement a timeout.... |
In terms of the things settling in call order, is that only in cases when you are awaiting the push as in:
So that you do not push an additional event until the first settles? But your guide mentions other patterns, of course, right? |
By iteration I just mean any call to Most of the time, this stuff isn’t a problem, because in 90% of use-cases you wait for the current iteration before pulling the next. But in those situations where you pull iterations concurrently, you likely need all the help you can get, and having iterations settle in order is just one less thing you have to worry about. Additionally, it was a requirement for repeaters, because one of the key design decisions was that repeaters should be indistinguishable from async generator objects, and having calls settle out of order would be a minor disparity between repeaters and async generators.
Long running promises are not supposed to cause memory leaks, but in practice, they often do, because we only keep promises around to be awaited or queried. Every time you call This problem is actually exacerbated by calling Repeater.race is safe to use with long-running promises, because it avoids calling
You do not need to Feel free to open new issues or discussions if you need any help! |
Closing this issue as the original question seems to have been resolved, and other stuff is tracked in other issues. |
Thanks so much, really helpful! |
Curious, is it possible to streamline a bit further? More specifically, in the latest revision above, we are not returning the value of
|
If the map iterator is returned prematurely, we need to call return on the source iterator. We may not use that final iteration value, but this final return call can take an indeterminate amount of time, and it can also throw an error. Not awaiting it would allow for potential unhandled promise rejections because you’re floating the |
Makes sense, thanks. |
Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
Implementation adapted from: repeaterjs/repeater#48 (comment) so that all payloads will be delivered in the original order
repeaterjs/repeater#48 (comment) add additional tests
Im having problems using the transducer above when it is thrown, I think because the awaited push rejection is stuck by the subsequent next of the underlying iterator |
@yaacovCR Can you open another issue? Sounds interesting! |
Since the API is so similar to Observable, I find it great for consuming observable-like things but I'm not as sold for using it on non-observable-like things, like transducers or other async iterator combiners.
It seems almost simpler to implement transducers like
map
correctly using a raw async iterator protocol than usingRepeater
.The naive assumption (in
repeaterMap
below) is to use afor await
loop, but this keeps consuming input after the caller returns, which would be especially bad if the input is infinite. Whereas a raw async iterator can immediately forward return and throw calls to the input.A less naive attempt (
awkwardRepeaterMap
) is to do thefor await
in a separate function and break once stop is detected. However more calls tonext()
still go through than with the raw async iterator.Thoughts?
Code
Output
The text was updated successfully, but these errors were encountered: