Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inner async for loop caused cursor already closed error #535

Closed
playpauseandstop opened this issue Feb 1, 2019 · 8 comments · Fixed by #801
Closed

Inner async for loop caused cursor already closed error #535

playpauseandstop opened this issue Feb 1, 2019 · 8 comments · Fixed by #801
Labels

Comments

@playpauseandstop
Copy link

After upgrade to aiopg==0.16.0 next pseudo-code caused cursor already closed InterfaceError,

async def view(request: web.Request) -> web.Response:
    async with request.app['db'].acquire() as conn:
         data = await fetch_data(conn)
    return web.json_response(convert_data(data))

async def fetch_data(conn: SAConnection) -> List[RowProxy]:
     result: List[RowProxy] = []
     async for item in conn.execute(query):
          result.extend(await fetch_inner_data(conn, item))
     return result

async def fetch_data_item(conn: SAConnection, item: RowProxy) -> List[RowProxy]:
     result: List[RowProxy] = []
     async for item in conn.execute(query):
          result.append(query)
     return result

The exception itself is:

InterfaceError: cursor already closed
  ...
  File "/api/views.py", line 388, in view
    context.user)
  File "/api/storage.py", line 403, in fetch_data
    async for item in conn.execute(query):
  File "aiopg/utils.py", line 97, in __anext__
    return (yield from self._obj.__anext__())
  File "aiopg/sa/result.py", line 359, in __anext__
    ret = yield from self.fetchone()
  File "aiopg/sa/result.py", line 401, in fetchone
    row = yield from self._cursor.fetchone()
  File "asyncio/coroutines.py", line 120, in coro
    res = func(*args, **kw)
  File "aiopg/cursor.py", line 194, in fetchone
    ret = self._impl.fetchone()

The error happened when running aiohttp.web server, as well as in tests, run by pytest. However downgrade to aiopg==0.15.0 fixes the issue. Any ideas on why it is happened and maybe you need more debug information?

OS: macOS 10.14.3, Ubuntu 16.04.3
Python version: 3.7.2
aiopg version: 0.16.0

@aio-libs-bot
Copy link

GitMate.io thinks possibly related issues are #111 (aiopg closes event loop on failed connection), #277 (Asyncio gather assertion error: Don't support server side cursors yet), #43 (Error with sqlalchemy), #352 (Should cursor.mogrify be coroutine?), and #201 (begin_nested() contextmanager error).

@vir-mir
Copy link
Member

vir-mir commented Feb 1, 2019

Hi @playpauseandstop thx! we are trying to solve the problem

I wrote a test for the reproduce the behavior

import pytest
import sqlalchemy as sa
from sqlalchemy.sql.ddl import CreateTable

meta = sa.MetaData()
tbl = sa.Table('sa_tbl4', meta,
               sa.Column('id', sa.Integer, nullable=False, primary_key=True),
               sa.Column('name', sa.String(255), nullable=False,
                         default='default test'),
               sa.Column('is_active', sa.Boolean, default=True))


@pytest.fixture
def engine(make_engine, loop):
    async def start():
        engine = await make_engine()
        async with engine.acquire() as conn:
            await conn.execute('DROP TABLE IF EXISTS sa_tbl4')
            await conn.execute(CreateTable(tbl))

            await conn.execute(tbl.insert().values())
            await conn.execute(tbl.insert().values())
            await conn.execute(tbl.insert().values())

        return engine

    return loop.run_until_complete(start())


async def test_view(engine):
    async with engine.acquire() as conn:
        data = await fetch_data(conn)
        print(data)


async def fetch_data_item(conn):
    result = []
    async for item in conn.execute(tbl.select()):
        result.append(item)
    return result


async def fetch_data(conn):
    result = []
    async for item in conn.execute(tbl.select()):
        result.extend((item, await fetch_data_item(conn)))
    return result

I checked this behavior after this PR #452

(aiopg) $ git checkout 8cea23ea88664df52665250802316852116b9d6b
HEAD is now at 8cea23e... resolve issue 364 (#452)
(aiopg)$ py.test -s -v tests/test_t.py 
Test session starts (platform: linux, Python 3.7.2, pytest 3.9.1, pytest-sugar 0.9.1)
cachedir: .pytest_cache
rootdir: /srv/python/3/aiopg, inifile: setup.cfg
plugins: timeout-1.3.2, sugar-0.9.1, cov-2.6.0

engine = <aiopg.sa.engine.Engine object at 0x7f665ea33710>

    async def test_view(engine):
        async with engine.acquire() as conn:
>           data = await fetch_data(conn)
E       psycopg2.InterfaceError: cursor already closed

aiopg/cursor.py:194: InterfaceError

before this commit test passes

(aiopg) $ git checkout 52876e398d312e68d1c6c0ac04d1c65cf1319973
HEAD is now at 52876e3... Scheduled weekly dependency update for week 38 (#504)
(aiopg) $ py.test -s -v tests/test_t.py 
Test session starts (platform: linux, Python 3.7.2, pytest 3.9.1, pytest-sugar 0.9.1)
cachedir: .pytest_cache
plugins: timeout-1.3.2, sugar-0.9.1, cov-2.6.0
timeout: 300.0s
timeout method: signal
timeout func_only: False
[(1, 'default test', True), [(1, 'default test', True), (2, 'default test', True), (3, 'default test', True)], (2, 'default test', True), [(1, 'default test', True), (2, 'default test', True), (3, 'default test', True)], (3, 'default test', True), [(1, 'default test', True), (2, 'default test', True), (3, 'default test', True)]]

 tests/test_t.py::test_view[9.5] ✓                                                                                                                                                          
Results (11.80s):
       1 passed

@thehesiod maybe you have ideas?

@vir-mir
Copy link
Member

vir-mir commented Feb 2, 2019

@playpauseandstop I thought about this issue.
I believe that it is correct that the cursor is closed.
About server-side cursors (aka named cursors) and asynchronous connections: from the official docs. http://initd.org/psycopg/docs/advanced.html#asynchronous-support

async for implicitly calls a method fetchone by moving the cursor to the next line, when it reaches the end rises StopAsyncIteration, but fetchone reaching the end closes the cursor. This is a repetition of the logic from sqlalchemy

In the issues #364, it was decided to hold one cursor for one connection.

In the synchronous version of sqlalchemy the cursor is allocated implicitly, during the call to the execute method. After execution, the cursor closes immediately.

We cannot do this in the asynchronous version, since the execute only executes the request, but we may still need to take the data. and the cursor will already be closed.

so you need to allocate a new connection from free pool if you want to make requests at the time of the iteration.

It may be worth raising an error when trying to open a new cursor in one connection.

We will also try as soon as possible to describe this behavior in the documentation.

@thehesiod
Copy link
Contributor

yep, please read from #364 (comment).

Issue here is that test_view, passes the connection conn to fetch_data, which does and async for on conn.execute (creating cursor1), and then in this loop again passes conn to fetch_data_item, which itself tries to do another conn.execute. You cannot have two async cursors with one connection with psycopg2. The code needs to be re-structured to pass the pool around instead so that each execute has its own cursor.

@vir-mir
Copy link
Member

vir-mir commented Feb 2, 2019

I think we can do something similar to the _ConnectionContextManager only _CursorExecuteContextManager for the conn.execute method.

then code samples will look like this:

async def foo(engine):
    async with engine.acquire() as conn:
        print(await (await conn.execute('select ... ')).fetchall())

        async for row in await conn.execute('select ... '):
            print(row)
            print(await (await conn.execute('select ... ')).fetchall())

        async with conn.execute('insert or update...'):
            pass
        
        async for row in await conn.execute('select ... '):
            async with conn.execute('insert ... %s', row):
                pass

But it requires discussion.

@thehesiod
Copy link
Contributor

that's not a good idea, a conn is a connection, and thus one cursor in psycopg2. you can't have conn.execute creating a new cursor because that would imply a new connection from the same connection object.

@asvetlov
Copy link
Member

asvetlov commented Feb 2, 2019

One cursor per connection sounds reasonable.
Raising an exception on cursor creation if a connection already has a cursor and the cursor is not finishing is a very good idea.
Getter to have simple and strict rules than fail rarely but mysteriously.

@vir-mir
Copy link
Member

vir-mir commented Mar 23, 2019

Dear all. Please see this pull request #548.

@Pliner Pliner mentioned this issue Mar 21, 2021
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants