Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom CursorIterator #1208

Open
letrec opened this issue Nov 4, 2024 · 2 comments
Open

Custom CursorIterator #1208

letrec opened this issue Nov 4, 2024 · 2 comments

Comments

@letrec
Copy link

letrec commented Nov 4, 2024

I'd like to implement my own CursorIterator, but I can't find a straightforward way of getting it done via asyncpg public interface.
Currently it's created by CursorFactory, which in turn is created by PreparedStatement.cursor.
I can see two options here:

  • Introduce an additional parameter of CursorFactory type in PreparedStatement.cursor which would be None by default and None means use standard CursorFactory
  • do the same in PreparedStatement.init
@elprans
Copy link
Member

elprans commented Nov 4, 2024

Which behaviors do you intend to implement in the custom class? Anything that just makes sense to integrate directly?

@letrec
Copy link
Author

letrec commented Nov 6, 2024

I implemented an iterator that kicks off a next fetch before yielding the result (see my concerns related to it here):

from asyncio import CancelledError, Task, create_task
from collections.abc import AsyncIterable

async def iter_fast[T](iterable: AsyncIterable[T]):
    iterator = aiter(iterable)
    pending: Task[T] | None = None
    while True:
        try:
            if pending is None:
                item = await anext(iterator)
            else:
                item = await pending
        except StopAsyncIteration:
            break
        else:
            pending = create_task(anext(iterator))
            yield item

It's not possible to use it as is given that current iterator fetches rows in batches, but yields individual rows.
So what I ended up doing is implemented an iterator that yields batches (it's basically a copy of CursorIterator), wrapped it into an iter_fast and then wrapped it one more time into an iterator that flattens batches into individual rows:

async def flatten[T](iterable: AsyncIterable[list[T]]) -> AsyncIterable[T]:
    async for batch in iterable:
        for item in batch:
            yield item

So the end results looks somewhat like this:

iterator = flatten(
    iter_fast(
        BatchIterator(conn, query, stmt._state, args, stmt._state.record_class, prefetch=15_000, timeout=timeout)))

It yields a performance improvement, but not as impressive as I hoped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants