⚡️ Speed up method S3DataSource.upload_fileobj by 8%
#635
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📄 8% (0.08x) speedup for
S3DataSource.upload_fileobjinbackend/python/app/sources/external/s3/s3.py⏱️ Runtime :
1.32 milliseconds→1.23 milliseconds(best of218runs)📝 Explanation and details
The optimization adds intelligent thread offloading for S3 file uploads using
asyncio.to_thread()to prevent event loop blocking when dealing with synchronous file objects.Key Change:
The optimized code conditionally uses
asyncio.to_thread()for theupload_fileobjoperation when theFileobjparameter is a standard synchronous file-like object (has.read()but not.read_async()). For truly async files, it continues using the direct async path.Why This Improves Performance:
Event Loop Protection: Standard file objects (like
io.BytesIOor regular files) can cause blocking I/O operations during upload, potentially stalling the async event loop. By offloading these to a thread pool, the event loop remains responsive.Parallelism Benefits: Thread offloading allows the CPU-bound file reading operations to run in parallel with other async operations, improving overall throughput especially under concurrent load.
Smart Path Selection: The optimization only applies thread offloading when needed (synchronous files), preserving the optimal direct async path for truly async file objects.
Performance Impact:
Test Case Benefits:
The optimization particularly benefits test cases involving multiple concurrent uploads (
test_upload_fileobj_large_scale_concurrent_uploadsandtest_upload_fileobj_throughput_large_load) where preventing event loop blocking becomes crucial for maintaining high concurrent throughput with standardio.BytesIOobjects.✅ Correctness verification report:
🌀 Generated Regression Tests and Runtime
import asyncio
import io
import pytest
from app.sources.external.s3.s3 import S3DataSource
class DummyS3Client:
"""Dummy S3Client for testing S3DataSource"""
def init(self, session):
self._session = session
def get_session(self):
return self._session
class DummyS3Session:
"""Dummy aioboto3 Session for testing"""
def init(self, upload_behavior=None):
self.upload_behavior = upload_behavior or {}
async def aenter(self):
return DummyS3ClientObj(self.upload_behavior)
async def aexit(self, exc_type, exc_val, exc_tb):
pass
def client(self, service_name):
return self
class DummyS3ClientObj:
"""Mock S3 client object with upload_fileobj method"""
def init(self, upload_behavior):
self.upload_behavior = upload_behavior
async def upload_fileobj(self, Fileobj, Bucket, Key, **kwargs):
# Simulate different behaviors based on test setup
if self.upload_behavior.get('raise_client_error'):
# Simulate botocore.exceptions.ClientError
class DummyClientError(Exception):
def init(self):
self.response = {'Error': {'Code': 'NoSuchBucket', 'Message': 'The specified bucket does not exist'}}
raise DummyClientError()
if self.upload_behavior.get('raise_exception'):
raise RuntimeError("Unexpected error")
if self.upload_behavior.get('return_none'):
return None
if self.upload_behavior.get('return_error_dict'):
return {'Error': {'Code': 'AccessDenied', 'Message': 'Permission denied'}}
# Simulate success (return a dict with some info)
return {'ETag': '"abc123"', 'Bucket': Bucket, 'Key': Key, 'Size': Fileobj.getbuffer().nbytes if hasattr(Fileobj, 'getbuffer') else None}
The function under test (copied exactly as provided)
try:
import aioboto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
except ImportError:
pass # For test purposes, we use dummy classes above
------------------ UNIT TESTS ------------------
1. Basic Test Cases
@pytest.mark.asyncio
async def test_upload_fileobj_empty_fileobj():
"""Test upload with empty fileobj."""
fileobj = io.BytesIO(b"")
session = DummyS3Session()
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
response = await datasource.upload_fileobj(fileobj, "bucket", "key")
@pytest.mark.asyncio
async def test_upload_fileobj_none_response():
"""Test when underlying S3 returns None (simulating empty response)."""
session = DummyS3Session(upload_behavior={'return_none': True})
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
fileobj = io.BytesIO(b"data")
response = await datasource.upload_fileobj(fileobj, "bucket", "key")
@pytest.mark.asyncio
async def test_upload_fileobj_error_dict_response():
"""Test when S3 returns error dictionary."""
session = DummyS3Session(upload_behavior={'return_error_dict': True})
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
fileobj = io.BytesIO(b"data")
response = await datasource.upload_fileobj(fileobj, "bucket", "key")
@pytest.mark.asyncio
async def test_upload_fileobj_client_error_handling():
"""Test handling of botocore.exceptions.ClientError."""
session = DummyS3Session(upload_behavior={'raise_client_error': True})
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
fileobj = io.BytesIO(b"data")
response = await datasource.upload_fileobj(fileobj, "bucket", "key")
@pytest.mark.asyncio
async def test_upload_fileobj_unexpected_exception_handling():
"""Test handling of unexpected exception."""
session = DummyS3Session(upload_behavior={'raise_exception': True})
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
fileobj = io.BytesIO(b"data")
response = await datasource.upload_fileobj(fileobj, "bucket", "key")
@pytest.mark.asyncio
async def test_upload_fileobj_large_scale_concurrent_uploads():
"""Test many concurrent uploads to check scalability and async behavior."""
session = DummyS3Session()
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
num_uploads = 50 # bounded for quick test
fileobjs = [io.BytesIO(b"x" * (i+1)) for i in range(num_uploads)]
tasks = [
datasource.upload_fileobj(fileobjs[i], "bucket", f"key{i}")
for i in range(num_uploads)
]
results = await asyncio.gather(*tasks)
for i, response in enumerate(results):
pass
4. Throughput Test Cases
@pytest.mark.asyncio
async def test_upload_fileobj_throughput_large_load():
"""Throughput test: large load, larger file size."""
session = DummyS3Session()
s3_client = DummyS3Client(session)
datasource = S3DataSource(s3_client)
fileobjs = [io.BytesIO(b"x" * 4096) for _ in range(40)] # 4KB files
tasks = [
datasource.upload_fileobj(fileobjs[i], "bucket", f"key{i}")
for i in range(40)
]
results = await asyncio.gather(*tasks)
for r in results:
pass
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio # used to run async functions
from io import BytesIO
from unittest.mock import AsyncMock, MagicMock, patch
import pytest # used for our unit tests
from app.sources.external.s3.s3 import S3DataSource
--- Minimal stubs for S3Response and S3Client ---
class S3Response:
"""Minimal S3Response class for testing."""
def init(self, success: bool, data=None, error=None):
self.success = success
self.data = data
self.error = error
--- Function under test (copied exactly as provided) ---
try:
import aioboto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
except ImportError:
# For test environment, we patch aioboto3 and ClientError below
pass
--- Helper: Mock S3Client and aioboto3 Session ---
class DummySession:
"""Mock aioboto3.Session object."""
def init(self, client_mock):
self._client_mock = client_mock
class DummyS3Client:
"""Mock S3Client for S3DataSource."""
def init(self, session):
self._session = session
--- Basic Test Cases ---
@pytest.mark.asyncio
To edit these changes
git checkout codeflash/optimize-S3DataSource.upload_fileobj-mhxd8va5and push.