Skip to content

Asyncio tasks are not destroyed #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
belolap opened this issue Jul 20, 2018 · 4 comments
Closed

Asyncio tasks are not destroyed #16

belolap opened this issue Jul 20, 2018 · 4 comments

Comments

@belolap
Copy link

belolap commented Jul 20, 2018

  • GraphQL AioWS version: 0.2.0
  • Python version: Python 3.6.6 (default, Jun 27 2018, 14:44:17)
  • Operating System: Debian Stretch

Then I close ws connection to server after subscription some asyncio tasks are not destroyed. If I reload page of my webapp several times, there are a lot of existing tasks. It's possible memory leak.

Example app:

from aiohttp.web import Application, run_app, WebSocketResponse
from graphql_ws.aiohttp import AiohttpSubscriptionServer

from graphene import Schema, ObjectType, Field, ID

# Schema
class HistoryRecord(ObjectType):
    id = ID()

class Query(ObjectType):
    history = Field(HistoryRecord)
    def resolve_history(self, info):
        return HistoryRecord(id='first')

class Subscription(ObjectType):
    history = Field(HistoryRecord)
    def resolve_history(self, info):
        async def resolver(info):
            yield HistoryRecord(id='second')
        return resolver(info)

schema = Schema(query=Query, subscription=Subscription)

# Server
class Server(AiohttpSubscriptionServer):
    async def on_open(self, *args, **kwargs):
        print('Connection open')
        await super().on_open(*args, **kwargs)
    def on_close(self, *args, **kwargs):
        print('Connection close')
        super().on_close(*args, **kwargs)
subscription_server = Server(schema)

# WS handler
async def subscriptions(request):
    ws = WebSocketResponse(protocols=('graphql-ws',))
    await ws.prepare(request)

    await subscription_server.handle(ws)
    return ws

# Application
app = Application()
app.router.add_get('/ws', subscriptions)

# List tasks
from asyncio import Task, get_event_loop, sleep
from sys import getrefcount
from time import time

async def test():
    count = 0
    while True:
        await sleep(15)
        count += 1
        print('. Timer iteration N', count)

        tasks = Task.all_tasks()
        print('.. Total tasks: %s' % len(tasks))
        for task in sorted(tasks, key=lambda x: str(x)):
            print('... refs={} {}'.format(getrefcount(task), task))

loop = get_event_loop()
loop.create_task(test())

# Fire!
run_app(app, host='0.0.0.0', port=8888)

Task list after 3 connections. Full log attached.

Connection open
. Timer iteration N 6
.. Total tasks: 14
... refs=4 <Task finished coro=<AiohttpSubscriptionServer._handle() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:54> result=None>
... refs=4 <Task finished coro=<AiohttpSubscriptionServer._handle() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:54> result=None>
... refs=4 <Task finished coro=<AiohttpSubscriptionServer.on_connection_init() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:93> result=None>
... refs=4 <Task finished coro=<AiohttpSubscriptionServer.on_connection_init() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:93> result=None>
... refs=5 <Task finished coro=<AiohttpSubscriptionServer.on_connection_init() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:93> result=None>
... refs=6 <Task finished coro=<AiohttpSubscriptionServer.on_start() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:101> result=None>
... refs=4 <Task finished coro=<AiohttpSubscriptionServer.on_start() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:101> result=None>
... refs=4 <Task finished coro=<AiohttpSubscriptionServer.on_start() done, defined at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:101> result=None>
... refs=6 <Task finished coro=<RequestHandler.start() done, defined at /usr/lib/python3/dist-packages/aiohttp/web_protocol.py:340> result=None>
... refs=5 <Task finished coro=<RequestHandler.start() done, defined at /usr/lib/python3/dist-packages/aiohttp/web_protocol.py:340> result=None>
... refs=4 <Task finished coro=<RequestHandler.start() done, defined at /usr/lib/python3/dist-packages/aiohttp/web_protocol.py:340> result=None>
... refs=5 <Task pending coro=<AiohttpSubscriptionServer._handle() running at /home/gik/src/graphql/graphql-ws/graphql_ws/aiohttp.py:62> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fcec832af48>()]> cb=[shield.<locals>._done_callback() at /usr/lib/python3.6/asyncio/tasks.py:688]>
... refs=8 <Task pending coro=<RequestHandler.start() running at /usr/lib/python3/dist-packages/aiohttp/web_protocol.py:376> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at0x7fcec832ab58>()]>>
... refs=6 <Task pending coro=<test() running at ./bin/ex.py:63>>
Connection close

logfile.txt

@ciscorn
Copy link
Contributor

ciscorn commented Jul 22, 2018

@belolap asyncio.Task internally relies on the GC, so you need to use weakref (or just wrap with another function):

while True:
    ...
    tasks = weakref.WeakSet(Task.all_tasks())
    ...

@dfee
Copy link
Member

dfee commented Sep 13, 2018

The problem is here:

await self.send_message(connection_context, op_id, GQL_COMPLETE)

when on_start finishes, there is nothing that removes the task from the operations.

@shalakhin
Copy link

Is there any solution to that problem at the moment?

@SmileyChris
Copy link
Contributor

Should work now in master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants