Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separated start corountine #5386

Closed
Bluenix2 opened this issue Jan 7, 2021 · 5 comments
Closed

Separated start corountine #5386

Bluenix2 opened this issue Jan 7, 2021 · 5 comments

Comments

@Bluenix2
Copy link

Bluenix2 commented Jan 7, 2021

🐣 Is your feature request related to a problem? Please describe.

I want to start an aiohttp.web.Application non-blockingly so that I can continue executing some lines.

💡 Describe the solution you'd like

I looked into it and the perfect solution would be to rename the internal _run_app corountine to start_app and then move the infinite while loop that sleeps into the run_app function.

Describe alternatives you've considered

Well for my use-case I've considered copying the body of the both functions and removing the logic that makes the function blocking.

📋 Additional context

I've tried to fix it, but I get this:

aiohttp/web.py: marked executable but has no (or invalid) shebang!
  If it isn't supposed to be executable, try: `chmod -x aiohttp/web.py`
  If it is supposed to be executable, double-check its shebang.

(I am using Windows WSL)

I already fixed the issue. But can't commit anything (I do pass all other tests). Here is the changed/important parts of the file:

async def start_app(
    app: Union[Application, Awaitable[Application]],
    *,
    host: Optional[Union[str, HostSequence]] = None,
    port: Optional[int] = None,
    path: Optional[str] = None,
    sock: Optional[socket.socket] = None,
    shutdown_timeout: float = 60.0,
    keepalive_timeout: float = 75.0,
    ssl_context: Optional[SSLContext] = None,
    print: Optional[Callable[..., None]] = print,
    backlog: int = 128,
    access_log_class: Type[AbstractAccessLogger] = AccessLogger,
    access_log_format: str = AccessLogger.LOG_FORMAT,
    access_log: Optional[logging.Logger] = access_logger,
    handle_signals: bool = True,
    reuse_address: Optional[bool] = None,
    reuse_port: Optional[bool] = None,
) -> AppRunner:
    """Start an app locally.

    Use `run_app` if you do not wish to control and keep the the event loop
    alive yourself, this is non-blocking and returns the runner.
    """
    if asyncio.iscoroutine(app):
        app = await app  # type: ignore

    app = cast(Application, app)

    runner = AppRunner(
        app,
        handle_signals=handle_signals,
        access_log_class=access_log_class,
        access_log_format=access_log_format,
        access_log=access_log,
        keepalive_timeout=keepalive_timeout,
    )

    await runner.setup()

    sites = []  # type: List[BaseSite]

    if host is not None:
        if isinstance(host, (str, bytes, bytearray, memoryview)):
            sites.append(
                TCPSite(
                    runner,
                    host,
                    port,
                    shutdown_timeout=shutdown_timeout,
                    ssl_context=ssl_context,
                    backlog=backlog,
                    reuse_address=reuse_address,
                    reuse_port=reuse_port,
                )
            )
        else:
            for h in host:
                sites.append(
                    TCPSite(
                        runner,
                        h,
                        port,
                        shutdown_timeout=shutdown_timeout,
                        ssl_context=ssl_context,
                        backlog=backlog,
                        reuse_address=reuse_address,
                        reuse_port=reuse_port,
                    )
                )
    elif path is None and sock is None or port is not None:
        sites.append(
            TCPSite(
                runner,
                port=port,
                shutdown_timeout=shutdown_timeout,
                ssl_context=ssl_context,
                backlog=backlog,
                reuse_address=reuse_address,
                reuse_port=reuse_port,
            )
        )

    if path is not None:
        if isinstance(path, (str, bytes, bytearray, memoryview)):
            sites.append(
                UnixSite(
                    runner,
                    path,
                    shutdown_timeout=shutdown_timeout,
                    ssl_context=ssl_context,
                    backlog=backlog,
                )
            )
        else:
            for p in path:
                sites.append(
                    UnixSite(
                        runner,
                        p,
                        shutdown_timeout=shutdown_timeout,
                        ssl_context=ssl_context,
                        backlog=backlog,
                    )
                )

    if sock is not None:
        if not isinstance(sock, Iterable):
            sites.append(
                SockSite(
                    runner,
                    sock,
                    shutdown_timeout=shutdown_timeout,
                    ssl_context=ssl_context,
                    backlog=backlog,
                )
            )
        else:
            for s in sock:
                sites.append(
                    SockSite(
                        runner,
                        s,
                        shutdown_timeout=shutdown_timeout,
                        ssl_context=ssl_context,
                        backlog=backlog,
                    )
                )
    for site in sites:
        await site.start()

    if print:  # pragma: no branch
        names = sorted(str(s.name) for s in runner.sites)
        print(
            "======== Running on {} ========\n"
            "(Press CTRL+C to quit)".format(", ".join(names))
        )

    return runner


def _cancel_tasks(
    to_cancel: Set["asyncio.Task[Any]"], loop: asyncio.AbstractEventLoop
) -> None:
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(
        asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
    )

    for task in to_cancel:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler(
                {
                    "message": "unhandled exception during asyncio.run() shutdown",
                    "exception": task.exception(),
                    "task": task,
                }
            )


def run_app(
    app: Union[Application, Awaitable[Application]],
    *,
    debug: bool = False,
    host: Optional[Union[str, HostSequence]] = None,
    port: Optional[int] = None,
    path: Optional[str] = None,
    sock: Optional[socket.socket] = None,
    shutdown_timeout: float = 60.0,
    keepalive_timeout: float = 75.0,
    ssl_context: Optional[SSLContext] = None,
    print: Optional[Callable[..., None]] = print,
    backlog: int = 128,
    access_log_class: Type[AbstractAccessLogger] = AccessLogger,
    access_log_format: str = AccessLogger.LOG_FORMAT,
    access_log: Optional[logging.Logger] = access_logger,
    handle_signals: bool = True,
    reuse_address: Optional[bool] = None,
    reuse_port: Optional[bool] = None,
) -> None:
    """Run an app locally,

    Use `start_app` if you wish to control and keep the event loop
    alive yourself.
    """
    loop = asyncio.get_event_loop()
    loop.set_debug(debug)

    # Configure if and only if in debugging mode and using the default logger
    if loop.get_debug() and access_log and access_log.name == "aiohttp.access":
        if access_log.level == logging.NOTSET:
            access_log.setLevel(logging.DEBUG)
        if not access_log.hasHandlers():
            access_log.addHandler(logging.StreamHandler())

    try:

        async def _run_app():
            try:
                runner = await start_app(
                    app,
                    host=host,
                    port=port,
                    path=path,
                    sock=sock,
                    shutdown_timeout=shutdown_timeout,
                    keepalive_timeout=keepalive_timeout,
                    ssl_context=ssl_context,
                    print=print,
                    backlog=backlog,
                    access_log_class=access_log_class,
                    access_log_format=access_log_format,
                    access_log=access_log,
                    handle_signals=handle_signals,
                    reuse_address=reuse_address,
                    reuse_port=reuse_port,
                )

                # sleep forever by 1 hour intervals,
                # on Windows before Python 3.8 wake up every 1 second to handle
                # Ctrl+C smoothly
                if sys.platform == "win32" and sys.version_info < (3, 8):
                    delay = 1
                else:
                    delay = 3600

                while True:
                    await asyncio.sleep(delay)
            finally:
                await runner.cleanup()

        main_task = loop.create_task(_run_app())
        loop.run_until_complete(main_task)
    except (GracefulExit, KeyboardInterrupt):  # pragma: no cover
        pass
    finally:
        _cancel_tasks({main_task}, loop)
        _cancel_tasks(asyncio.all_tasks(loop), loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
        asyncio.set_event_loop(None)

As to not make this issue too big I've pasted the entire file (if needed) on Mystbin: https://mystb.in/HalloweenPreferenceReunion.sql

I would open a pull request if I could, what can be done about this?

@Dreamsorcerer
Copy link
Member

I'm not sure why you can't commit or make a PR (isn't the error you mention from linting? You should be able to commit and create a PR without passing the checks).

I'm not sure exactly what your use case is, but I think there are 2 types:

  1. You are running a web application directly, in which case the "some lines" you mention should be within that application (e.g. in a task created on startup).
  2. You are running some kind of tool which runs a web application within it (e.g. aiohttp-devtools). In this case, I'd look at how aiohttp-devtools handles it, by using AppRunner directly: https://github.com/aio-libs/aiohttp-devtools/blob/master/aiohttp_devtools/runserver/serve.py#L115-L140

Does that solve the problem, or is there something else needed?

@Bluenix2
Copy link
Author

Bluenix2 commented Jan 8, 2021

I'm not sure why you can't commit or make a PR (isn't the error you mention from linting? You should be able to commit and create a PR without passing the checks).

It doesn't allow me to commit, the pre-commit hooks fail. When I run make fmt all files get that error:

FILE_THAT_FAILED: marked executable but has no (or invalid) shebang!
  If it isn't supposed to be executable, try: `chmod -x FILE_THAT_FAILED`
  If it is supposed to be executable, double-check its shebang.

I'm not sure exactly what your use case is,

My use-case is that I want to simply start the app locally but not stop execution there. I want to write code under the start that does a few other things (mostly start other things). And then handle "keeping it alive" (the while loop with a sleep) myself.

Does that solve the problem, or is there something else needed?

I guess I may want to try to use AppRunner myself (and basically copy the code inside my now changed start_app function), but I think what I changed would be what I want.

@Dreamsorcerer
Copy link
Member

It doesn't allow me to commit, the pre-commit hooks fail. When I run make fmt all files get that error:

But, you must have setup pre-commit hooks, Git doesn't set them up by default (as it would be dangerous). i.e.

git clone https://github.com/aio-libs/aiohttp.git
[edit something]
git commit -m 'I changed something'

Should successfully commit without running any hooks.

I believe hooks are under .git/hooks/, so just delete it?

My use-case is that I want to simply start the app locally but not stop execution there. I want to write code under the start that does a few other things (mostly start other things). And then handle "keeping it alive" (the while loop with a sleep) myself.

That's still not very clear to me. Here's an example of things I do in my app which may or may not be similar.
The app has a subproject which runs on a separate server and communicates with the app. When running locally, I want to run this subproject locally as an extra process and stop it again when I stop the web app.
Another example I do is to check if the installed packages match requirements.txt and warn the developer if not.
And one more, is to automatically build a JS bundle with npm.

To achieve these, I put it all in a module which is only used when the application is run locally, all the code is then still run within the application on startup. The requirements check is just run as a blocking function, the JS bundle is run as a new Task in the background, and the subproject process is created in a cleanup_ctx, so it starts and stops with the application.

Maybe this might be a better approach for whatever you are trying to achieve?

Does that solve the problem, or is there something else needed?

I guess I may want to try to use AppRunner myself (and basically copy the code inside my now changed start_app function), but I think what I changed would be what I want.

My first thought when looking at how that function works, is that it's seems complicated and not designed for public use. I'm not the one that'll make the call on whether the change gets merged or not, but my suspicion is that it would not be approved as is (although it's also difficult to tell without a diff).

@Rixxan
Copy link

Rixxan commented Mar 27, 2022

Hello!

This sort of functionality is something that I'd really love to see worked into AIOHTTP in some way. Currently, to achieve the same end result I end up spinning up two separate threads and a unique event loop for each thread, one thread which for AIOHTTP and one for literally everything else as I cannot run everything on the same event loop due to the nature of run_app.

I'd love to see an interface that would allow something along the lines of the following...

async def main(...):
  return await asyncio.gather(
    webserver.run_app(...),
    my_stuff(...),
  )

@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Mar 27, 2022

As explained above, running other code does work. Nobody has come up with a valid use case yet which is not supported.

Either, you are running a web app (the most common scenario), in which case aiohttp should control the loop and you should hook into the app's runtime. Or, you are writing a tool that happens to run a web app (e.g. aiohttp-devtools), in which case you should manage the loop and the lifetime of the app yourself by using the app runners.

Documentation for both:
https://docs.aiohttp.org/en/stable/web_advanced.html#complex-applications
https://docs.aiohttp.org/en/stable/web_advanced.html#application-runners

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants