Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiohttp hangs when uploading a file during streaming download #10169

Closed
1 task done
javitonino opened this issue Dec 16, 2024 · 10 comments · Fixed by #10171
Closed
1 task done

aiohttp hangs when uploading a file during streaming download #10169

javitonino opened this issue Dec 16, 2024 · 10 comments · Fixed by #10171
Labels

Comments

@javitonino
Copy link
Contributor

javitonino commented Dec 16, 2024

Describe the bug

I've got some code that downloads a large archive file from S3 using aiobotocore, extracts it and uploads each file to another key in the same bucket, all using iterators to work on small chunks of data. We noticed that the process got stuck some times (maybe ~30% of the time) while doing so. No timeout or exceptions were raised but the process never finished.

Originally reported as an aiobotocore issue but while debugging it, we managed to get to this minimum reproducing example using only aiohttp. Although in this example a timeout is being raised and the original it didn't (or it might have been swallowed by our code).

To Reproduce

Run a localstack server as follows:

docker run \
  --rm -it \
  -p 127.0.0.1:4566:4566 \
  -p 127.0.0.1:4510-4559:4510-4559 \
  -v /var/run/docker.sock:/var/run/docker.sock \
  localstack/localstack

dd if=/dev/zero of=ae4bbfcdc47f450aa8557abefeba4a5ct bs=1M count=1
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket bucket --region local
aws --endpoint-url=http://localhost:4566 s3 cp ae4bbfcdc47f450aa8557abefeba4a5ct s3://bucket

Then run a streaming download with concurrent uploads:

import asyncio
import aiohttp

async def run():
    for _ in range(10):
        async with aiohttp.ClientSession() as session:
            response = await session.get(
                "http://localhost:4566/bucket/ae4bbfcdc47f450aa8557abefeba4a5ct",
            )
            i = 0
            async for chunk in response.content.iter_chunked(1024):
                i += 1

                print(f"chunk {i}")
                # This process only fails near the end of the download, this condition is just here to speed up the testing
                # by skipping some uploads but the bug reproduces without it, it just takes more time. 
                if i >= 900:
                    print("Streamed, time to upload")
                    # It hangs awaiting this, no timeout is raised
                    await session.put(
                        "http://localhost:4566/bucket/output/some_file",
                        data=b""
                    )
                    print("Uploaded")

asyncio.run(run())

Sometimes, this will get stuck and eventually timeout after 5 minutes (or whatever the timeout is set to).

Expected behavior

The script finishes successfully.

Logs/tracebacks

Traceback (most recent call last):
  File "venv/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 1055, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^
  File "venv/lib/python3.12/site-packages/aiohttp/streams.py", line 668, in read
    await self._waiter
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "replicate.py", line 24, in <module>
    asyncio.run(run())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "replicate.py", line 18, in run
    await session.put(
  File "venv/lib/python3.12/site-packages/aiohttp/client.py", line 728, in _request
    await resp.start(conn)
  File "venv/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 1050, in start
    with self._timer:
         ^^^^^^^^^^^
  File "venv/lib/python3.12/site-packages/aiohttp/helpers.py", line 671, in __exit__
    raise asyncio.TimeoutError from exc_val
TimeoutError

Python Version

3.12.7

aiohttp Version

3.11.10

Also tested 3.7.0, 3.8.0, 3.9.0, 3.10.0, 3.11.0 with same results.

multidict Version

4.7.6

propcache Version

0.2.1

yarl Version

1.18.3

OS

Tested in Arch Linux and Ubuntu 24.04

Related component

Client

Additional context

A bit more context in the original issue. The important points are:

  • It's a non-deterministic bug, it does not always happen. But with this script I managed to reproduce it ~30% of the tries.
  • If it hangs, it always does so near the end of the download, at exactly the same place in the code. In the original code, downloading an archive of123 files, it either failed when trying to upload the 110th, or it didn't fail. Debugging it a bit, I think that's when the network download has completed but we still have buffered data pending to process.
  • I attempted to reproduce against other services and I was unable to do so. I can reproduce against S3 (the cloud services) and the LocalStack emulator. I was not able to reproduce against minio (another S3 emulator).
  • As a workaround, I'm using two separate clients, one to do the download an one to do the uploads. This works fine and I'm not experiencing any problems this way.
  • Putting breakpoints through the code makes it much harder to reproduce. I'm assuming it's some kind of timing bug and that pausing in the debugger makes it go away.
  • Looking at network dumps, it seems the issue manifest when the upload tries to reuse the download connection just after it finishes. At the network level, everything seems correct, the request is sent and the response received. But aiohttp never returns (times out with the trace above).

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
@Dreamsorcerer
Copy link
Member

Does it happen when using async with properly?

I think I saw something similar recently that looked like some kind of race condition that wasn't an issue when using the context manager.

@javitonino
Copy link
Contributor Author

Do you mean the request context manager like so:

import asyncio
import aiohttp

async def run():
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
        async with session.get("http://localhost:4566/bucket/ae4bbfcdc47f450aa8557abefeba4a5ct") as response:
            i = 0
            async for chunk in response.content.iter_chunked(1024):
                i += 1

                print(f"chunk number {i}")
                if i >= 900:
                    print("Streamed, time to upload")
                    # It hangs awaiting this
                    async with session.put("http://localhost:4566/bucket/output/some_file", data=b"") as response:
                        pass
                    print("Uploaded")

asyncio.run(run())

It still happens when using it like this.

@Dreamsorcerer
Copy link
Member

Right, that's not related then to this issue.

@javitonino
Copy link
Contributor Author

Ok, I've been doing some more digging and adding debug messages here and there and it seems the inconsistency is related to connection reuse.

If the download finishes quickly, the connection will be released to the pool early. Then, the upload will try to reuse this connection and it hangs as described above.

If the download is slower, the upload will trigger before the connection is released to the pool. The upload will then use a new connection and return it to the pool. Most of the times, this makes it so the upload on the next iteration picks up this new connection that was just released and so the process continues normally.

I think the problem is that when the connection is released to the pool (on download eof) while the iteration is still ongoing, the connection is somehow unusable until iteration ends. So if a request takes it out of the pool and blocks the iteration, you get a deadlock (iteration waiting for upload to complete, upload waiting for iteration to complete).

@Dreamsorcerer
Copy link
Member

I think the problem is that when the connection is released to the pool (on download eof) while the iteration is still ongoing, the connection is somehow unusable until iteration ends.

Hmm, I don't think there should be any relation between them that could cause that. Will have to add some debugging in to figure out exactly what code is reached and what condition is being waited on.

Another possibility is that the server is misbehaving and not handling connection reuse correctly..

@javitonino
Copy link
Contributor Author

javitonino commented Dec 16, 2024

Well, I think I found something but I am lacking context on how all of this works, so this explanation might not be entirely accurate.

It seems the connection can be returned to the pool with the protocol paused. The StreamReader will pause the protocol when it's too full. Then, an EOF can be received while still paused, triggering EOF callbacks and returning the connection to the pool. This happens when the last network packet of the HTTP response happens to put the stream reader buffer over the limit.

In that case, the new request will get a connection in paused state and will be unable to read from it until resumed. I did a tentative fix that works for my example, by unpausing the protocol when receiving and EOF, e.g: adding this.

class StreamReader:
    ...
    def feed_eof(self):
        ...
        if self._protocol._reading_paused:
            self._protocol.resume_reading()

This seems to work OK but I'm not sure it's the right approach. Another alternative could be to unpause the protocol directly in pool code, either when returning or taking out a connection from the pool. For example, unpausing in ClientResponse._release_connection also works.

@Dreamsorcerer
Copy link
Member

Excellent sleuthing. Either of those sounds like they might be a good solution. First thing we need though is a regression test. Hopefully with that knowledge, we can create a complete test to add to the test suite now.

@thehesiod
Copy link
Contributor

these are the best bugs, thanks for taking the time to help track this one down!

@asvetlov
Copy link
Member

Nice catch!

@javitonino
Copy link
Contributor Author

Thank you everyone! Indeed, this is one of the fun bugs to track down. At least, I had some fun with it :)

Anyway, I prepared a PR with the proposed fix and a couple of tests: #10171

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants
@asvetlov @Dreamsorcerer @thehesiod @javitonino and others