Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crashes while performing asyncio.gather tasks #634

Closed
myraygunbarrel opened this issue Mar 5, 2020 · 3 comments
Closed

Crashes while performing asyncio.gather tasks #634

myraygunbarrel opened this issue Mar 5, 2020 · 3 comments
Labels
question A community question, closed when inactive.

Comments

@myraygunbarrel
Copy link

  • GINO version: 0.8.6
  • Python version: 3.8
  • asyncpg version: 0.20.1
  • aiocontextvars version: 0.2.2
  • PostgreSQL version: 12.2

Description

Here is a minimal FastApi+gino service:

import asyncio

import uvicorn
from fastapi import FastAPI
from gino.ext.starlette import Gino

DATABASE_CONFIG = {
    'user': 'django',
    'database': 'django',
    'host': 'localhost',
    'port': 6432,
    'password': 'django'
}

db = Gino(None, **DATABASE_CONFIG)
app = FastAPI()
db.init_app(app)


@app.get("/")
async def root():
    tasks = [db.scalar('SELECT now()') for i in range(5)]
    result = await asyncio.gather(*tasks)
    print(result)
    return {"message": 'OK'}


if __name__ == '__main__':
    uvicorn.run(
        'snippet:app',
        host='localhost',
        port=9090,
        reload=True,
        debug=True
    )

What I Did

I try to run several queries concurrently, but get this error:

INFO:     SELECT now()
INFO:     ()
INFO:     SELECT now()
INFO:     ()
INFO:     SELECT now()
INFO:     ()
INFO:     SELECT now()
INFO:     ()
INFO:     SELECT now()
INFO:     ()
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 385, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/uvicorn/middleware/debug.py", line 81, in __call__
    raise exc from None
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/uvicorn/middleware/debug.py", line 78, in __call__
    await self.app(scope, receive, inner_send)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/fastapi/applications.py", line 140, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/starlette/applications.py", line 134, in __call__
    await self.error_middleware(scope, receive, send)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 178, in __call__
    raise exc from None
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 156, in __call__
    await self.app(scope, receive, _send)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/gino/ext/starlette.py", line 80, in __call__
    await conn.release()
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/gino/engine.py", line 276, in release
    await dbapi_conn.release(True)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/gino/engine.py", line 44, in release
    return await self._release()
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/gino/engine.py", line 80, in _release
    await self._pool.release(conn)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/gino/dialects/asyncpg.py", line 227, in release
    await self._pool.release(conn)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 654, in release
    return await asyncio.shield(ch.release(timeout))
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 216, in release
    raise ex
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 206, in release
    await self._con.reset(timeout=budget)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1114, in reset
    await self.execute(reset_query, timeout=timeout)
  File "/Users/ruslan/telemetry/venv/lib/python3.8/site-packages/asyncpg/connection.py", line 272, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 301, in query
  File "asyncpg/protocol/protocol.pyx", line 664, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

Are there any tricks to perform such operations in a proper way?

@Pentusha
Copy link

Pentusha commented Mar 5, 2020

Check this issue #572

@fantix
Copy link
Member

fantix commented Mar 5, 2020

Yes, that's a good reference, thank you @Pentusha .

To add more information to that, 2 facts to know first:

  • The Starlette extension by default enables use_connection_for_request
  • GINO is using the contextvars to keep track of contextual connections

That means in your code, a lazy connection is stored in the context right before entering the root handler. Because the 5 tasks you created share the same context, the same connection is shared between the 5 tasks. When they all try to query the database at the same time (PostgreSQL client protocol is not usually multiplexed), asyncpg reports this error trying to acquire the same lock multiple times.

By the way, Starlette (and some other frameworks) handles different requests in different tasks. And it is fine to run concurrent database queries in these tasks because they don't share the same context, thus they run queries in different database connections.

To solve your issue, you could turn off use_connection_for_request. But this will only work if no other contextual connection is manually created. For example, this will also fail:

DATABASE_CONFIG = {
    ....
    'use_connection_for_request': False,
}

@app.get("/")
async def root():
    async with db.acquire():
        tasks = [db.scalar('SELECT now()') for i in range(5)]
        result = await asyncio.gather(*tasks)

Another more reliable solution is to enforce a new context for each subtask as @Pentusha
pointed in the reference. In that case, the contextual connection will not be inherited in the subtasks, and subtasks will get their separate new connections.

Alternatively, you could explicitly acquire new connections in the subtasks like this:

async def now():
    async with db.acquire():
        return await db.scalar("SELECT now()")

@app.get("/")
async def root():
    async with db.acquire():
        tasks = [now() for i in range(5)]
        result = await asyncio.gather(*tasks)

@myraygunbarrel
Copy link
Author

Thanks a lot, this solution works for me!

@fantix fantix added the question A community question, closed when inactive. label Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question A community question, closed when inactive.
Projects
None yet
Development

No branches or pull requests

3 participants