Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 13, 2025

📄 27% (0.27x) speedup for S3DataSource.list_parts in backend/python/app/sources/external/s3/s3.py

⏱️ Runtime : 1.82 milliseconds 1.44 milliseconds (best of 208 runs)

📝 Explanation and details

The optimized code achieves a 26% runtime improvement by implementing async S3 client connection pooling instead of creating a new connection for each request.

Key Optimization: Connection Reuse

  • Original: Creates a new session.client('s3') connection for every list_parts() call, requiring expensive connection setup/teardown
  • Optimized: Caches the async S3 client in _s3_async_client and reuses it across multiple calls via _get_s3_async_client()

Why This Speeds Up Performance
The line profiler shows the critical difference:

  • Original: async with session.client('s3') as s3_client: takes 16.4% of total time (2.1293e+06 ns)
  • Optimized: s3_client = await self._get_s3_async_client() is much faster as it returns the cached client after the first call

Connection Management Benefits

  • Session tracking: Monitors session changes via _last_session to handle client invalidation properly
  • Proper cleanup: _close_s3_async_client() and __aexit__() ensure resources are freed
  • Async context safety: Uses __aenter__() and __aexit__() for proper async resource lifecycle

Test Case Performance
The optimization particularly benefits:

  • Concurrent workloads: Tests with 50-250 concurrent calls see reduced connection overhead
  • Repeated operations: Sequential calls to list_parts() avoid repeated connection setup
  • High-throughput scenarios: Multiple S3 operations in quick succession benefit from connection pooling

Impact on Workloads
This optimization is most effective when list_parts() is called frequently or in batches, as each subsequent call after the first avoids the expensive connection establishment process that aioboto3 normally performs.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 862 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 88.5%
🌀 Generated Regression Tests and Runtime

import asyncio # used to run async functions

Mocks for S3Client, S3Response, aioboto3.Session, and exceptions

from typing import Optional

import pytest # used for our unit tests
from app.sources.external.s3.s3 import S3DataSource

--- End S3Response definition ---

--- Begin ClientError mock ---

class ClientError(Exception):
def init(self, response, operation_name):
self.response = response
self.operation_name = operation_name

--- End ClientError mock ---

--- Begin aioboto3.Session mock ---

class MockS3Client:
def init(self, responses=None, raise_exc=None):
self.responses = responses or {}
self.raise_exc = raise_exc

async def __aenter__(self):
    return self

async def __aexit__(self, exc_type, exc, tb):
    pass

async def list_parts(self, **kwargs):
    # If raise_exc is set, raise it
    if self.raise_exc:
        raise self.raise_exc
    # Return a response based on the key tuple of input parameters
    key = (kwargs.get('Bucket'), kwargs.get('Key'), kwargs.get('UploadId'))
    # If a custom response is set for this key, return it
    if key in self.responses:
        return self.responses[key]
    # Otherwise, return a default successful response
    return {
        'Parts': [{'PartNumber': 1, 'ETag': 'etag1'}, {'PartNumber': 2, 'ETag': 'etag2'}],
        'Bucket': kwargs.get('Bucket'),
        'Key': kwargs.get('Key'),
        'UploadId': kwargs.get('UploadId'),
        'Error': None
    }

class MockSession:
def init(self, s3_client=None):
self.s3_client = s3_client or MockS3Client()

def client(self, service_name):
    return self.s3_client

--- End aioboto3.Session mock ---

--- Begin S3Client mock ---

class MockS3ClientWrapper:
def init(self, session):
self._session = session

def get_session(self):
    return self._session

--- End S3Client mock ---

--- Begin S3DataSource definition (copied EXACTLY as provided) ---

try:
import aioboto3 # type: ignore
from botocore.exceptions import
ClientError as RealClientError # type: ignore
except ImportError:
pass # Ignore for test context

--- End S3DataSource definition ---

------------------ UNIT TESTS ------------------

----------- BASIC TEST CASES -----------

@pytest.mark.asyncio
async def test_list_parts_basic_success():
"""Test basic async/await behavior and successful response."""
# Setup: mock session returns a default success response
session = MockSession()
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
result = await s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_basic_optional_params():
"""Test passing optional parameters and receiving a successful response."""
session = MockSession()
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
result = await s3ds.list_parts(
Bucket="bucket",
Key="key",
UploadId="uploadid",
MaxParts=10,
PartNumberMarker=5,
RequestPayer="requester",
ExpectedBucketOwner="owner",
SSECustomerAlgorithm="AES256",
SSECustomerKey="key123",
SSECustomerKeyMD5="md5val"
)

----------- EDGE TEST CASES -----------

@pytest.mark.asyncio
async def test_list_parts_error_response_dict():
"""Test handling of error response in dict format."""
error_response = {
'Error': {'Code': 'NoSuchUpload', 'Message': 'The specified upload does not exist'}
}
# Setup: mock session returns error dict for given params
s3_client = MockS3Client(responses={
("bucket", "key", "uploadid"): error_response
})
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
result = await s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_none_response():
"""Test handling of None response from S3 client."""
s3_client = MockS3Client(responses={
("bucket", "key", "uploadid"): None
})
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
result = await s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_client_error_exception():
"""Test handling of ClientError exception."""
error_dict = {'Error': {'Code': 'AccessDenied', 'Message': 'Access Denied'}}
exc = ClientError(response=error_dict, operation_name="ListParts")
s3_client = MockS3Client(raise_exc=exc)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
result = await s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_unexpected_exception():
"""Test handling of unexpected exception."""
exc = RuntimeError("Unexpected failure")
s3_client = MockS3Client(raise_exc=exc)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
result = await s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_concurrent_execution():
"""Test concurrent execution of list_parts for different uploads."""
responses = {
("bucket", "key", "uploadid1"): {'Parts': [{'PartNumber': 1}], 'Bucket': "bucket", 'Key': "key", 'UploadId': "uploadid1"},
("bucket", "key", "uploadid2"): {'Parts': [{'PartNumber': 2}], 'Bucket': "bucket", 'Key': "key", 'UploadId': "uploadid2"}
}
s3_client = MockS3Client(responses=responses)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
# Run two calls concurrently
results = await asyncio.gather(
s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid1"),
s3ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid2")
)

----------- LARGE SCALE TEST CASES -----------

@pytest.mark.asyncio
async def test_list_parts_large_scale_concurrent():
"""Test large scale concurrent execution of list_parts."""
# Create 50 unique responses for 50 unique UploadIds
responses = {
("bucket", "key", f"uploadid{i}"): {
'Parts': [{'PartNumber': i}], 'Bucket': "bucket", 'Key': "key", 'UploadId': f"uploadid{i}"
}
for i in range(50)
}
s3_client = MockS3Client(responses=responses)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
# Run 50 calls concurrently
coros = [
s3ds.list_parts(Bucket="bucket", Key="key", UploadId=f"uploadid{i}")
for i in range(50)
]
results = await asyncio.gather(*coros)
for i, result in enumerate(results):
pass

----------- THROUGHPUT TEST CASES -----------

@pytest.mark.asyncio
async def test_list_parts_throughput_small_load():
"""Test throughput under small load (10 concurrent calls)."""
responses = {
("bucket", "key", f"uploadid{i}"): {
'Parts': [{'PartNumber': i}], 'Bucket': "bucket", 'Key': "key", 'UploadId': f"uploadid{i}"
}
for i in range(10)
}
s3_client = MockS3Client(responses=responses)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
coros = [
s3ds.list_parts(Bucket="bucket", Key="key", UploadId=f"uploadid{i}")
for i in range(10)
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_list_parts_throughput_medium_load():
"""Test throughput under medium load (100 concurrent calls)."""
responses = {
("bucket", "key", f"uploadid{i}"): {
'Parts': [{'PartNumber': i}], 'Bucket': "bucket", 'Key': "key", 'UploadId': f"uploadid{i}"
}
for i in range(100)
}
s3_client = MockS3Client(responses=responses)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
coros = [
s3ds.list_parts(Bucket="bucket", Key="key", UploadId=f"uploadid{i}")
for i in range(100)
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_list_parts_throughput_high_volume():
"""Test throughput under high volume (250 concurrent calls)."""
responses = {
("bucket", "key", f"uploadid{i}"): {
'Parts': [{'PartNumber': i}], 'Bucket': "bucket", 'Key': "key", 'UploadId': f"uploadid{i}"
}
for i in range(250)
}
s3_client = MockS3Client(responses=responses)
session = MockSession(s3_client)
client = MockS3ClientWrapper(session)
s3ds = S3DataSource(client)
coros = [
s3ds.list_parts(Bucket="bucket", Key="key", UploadId=f"uploadid{i}")
for i in range(250)
]
results = await asyncio.gather(*coros)
# Check that each UploadId is correct
for i, result in enumerate(results):
pass

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # used to run async functions
from unittest.mock import AsyncMock, MagicMock, patch

import pytest # used for our unit tests
from app.sources.external.s3.s3 import S3DataSource

class S3Client:
"""Stub for S3Client used by S3DataSource."""
def init(self, session):
self._session = session

def get_session(self):
    return self._session

--- Patch aioboto3.Session and botocore.exceptions.ClientError for test isolation ---

class FakeClientError(Exception):
"""Fake ClientError for simulating botocore.exceptions.ClientError."""
def init(self, response, operation_name):
self.response = response
self.operation_name = operation_name

--- The function under test (copied as required, DO NOT MODIFY) ---

try:
import aioboto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
except ImportError:
# For testing, patch these in sys.modules
import sys

import aioboto3
from botocore.exceptions import ClientError

--- Test suite for S3DataSource.list_parts ---

@pytest.fixture
def fake_s3_session():
"""Fixture to provide a fake aioboto3 session with async context manager client."""
class FakeS3Client:
def init(self, response=None, raise_exc=None):
self._response = response
self._raise_exc = raise_exc

    async def list_parts(self, **kwargs):
        # Simulate raising exception if specified
        if self._raise_exc:
            raise self._raise_exc
        return self._response

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        pass

class FakeSession:
    def __init__(self, response=None, raise_exc=None):
        self._response = response
        self._raise_exc = raise_exc
        self._client_args = []

    def client(self, service_name):
        self._client_args.append(service_name)
        return FakeS3Client(response=self._response, raise_exc=self._raise_exc)
return FakeSession

------------------ BASIC TEST CASES ------------------

@pytest.mark.asyncio
async def test_list_parts_basic_success(fake_s3_session):
"""Test that list_parts returns success for a typical S3 response."""
# Simulate a typical S3 response dict
s3_response = {'Parts': [{'PartNumber': 1, 'ETag': '"etag1"'}]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
result = await ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_basic_error_response(fake_s3_session):
"""Test that list_parts returns error when S3 returns an error dict."""
s3_response = {'Error': {'Code': 'NoSuchUpload', 'Message': 'The specified upload does not exist.'}}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
result = await ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_basic_none_response(fake_s3_session):
"""Test that list_parts handles None response gracefully."""
session = fake_s3_session(response=None)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
result = await ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_basic_async_behavior(fake_s3_session):
"""Test that the function is properly awaitable and returns a coroutine."""
s3_response = {'Parts': []}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
codeflash_output = ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid"); coro = codeflash_output
result = await coro

------------------ EDGE TEST CASES ------------------

@pytest.mark.asyncio
async def test_list_parts_concurrent_calls(fake_s3_session):
"""Test concurrent execution of multiple list_parts calls."""
s3_response = {'Parts': [{'PartNumber': 1}]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
# 5 concurrent calls with different UploadIds
tasks = [
ds.list_parts(Bucket="bucket", Key="key", UploadId=f"uploadid_{i}")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for result in results:
pass

@pytest.mark.asyncio

async def test_list_parts_handles_unexpected_exception(fake_s3_session):
"""Test that list_parts handles unexpected exceptions gracefully."""
class SomeOtherException(Exception):
pass
session = fake_s3_session(raise_exc=SomeOtherException("unexpected!"))
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
result = await ds.list_parts(Bucket="bucket", Key="key", UploadId="uploadid")

@pytest.mark.asyncio
async def test_list_parts_optional_parameters(fake_s3_session):
"""Test that all optional parameters are passed correctly."""
s3_response = {'Parts': []}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
# All optional params supplied
result = await ds.list_parts(
Bucket="bucket",
Key="key",
UploadId="uploadid",
MaxParts=10,
PartNumberMarker=2,
RequestPayer="requester",
ExpectedBucketOwner="owner",
SSECustomerAlgorithm="AES256",
SSECustomerKey="key",
SSECustomerKeyMD5="md5"
)

------------------ LARGE SCALE TEST CASES ------------------

@pytest.mark.asyncio
async def test_list_parts_large_scale_concurrent(fake_s3_session):
"""Test list_parts under high concurrency (50 concurrent calls)."""
s3_response = {'Parts': [{'PartNumber': i} for i in range(1, 4)]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
# 50 concurrent calls
tasks = [
ds.list_parts(Bucket="bucket", Key="key", UploadId=f"uploadid_{i}")
for i in range(50)
]
results = await asyncio.gather(*tasks)
for result in results:
pass

------------------ THROUGHPUT TEST CASES ------------------

@pytest.mark.asyncio
async def test_list_parts_throughput_small_load(fake_s3_session):
"""Throughput test: small load (10 calls)."""
s3_response = {'Parts': [{'PartNumber': 1}]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
tasks = [ds.list_parts(Bucket="bucket", Key="key", UploadId=f"up_{i}") for i in range(10)]
results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_list_parts_throughput_medium_load(fake_s3_session):
"""Throughput test: medium load (100 calls)."""
s3_response = {'Parts': [{'PartNumber': 1}]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
tasks = [ds.list_parts(Bucket="bucket", Key="key", UploadId=f"up_{i}") for i in range(100)]
results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_list_parts_throughput_large_load(fake_s3_session):
"""Throughput test: large load (200 calls, still under 1000)."""
s3_response = {'Parts': [{'PartNumber': 1}]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
tasks = [ds.list_parts(Bucket="bucket", Key="key", UploadId=f"up_{i}") for i in range(200)]
results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_list_parts_throughput_varied_inputs(fake_s3_session):
"""Throughput test: varied UploadIds and Keys."""
s3_response = {'Parts': [{'PartNumber': 1}]}
session = fake_s3_session(response=s3_response)
s3_client = S3Client(session)
ds = S3DataSource(s3_client)
tasks = [
ds.list_parts(
Bucket="bucket",
Key=f"key_{i%5}",
UploadId=f"uploadid_{i}"
)
for i in range(50)
]
results = await asyncio.gather(*tasks)
# Ensure all responses are as expected
for r in results:
pass

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-S3DataSource.list_parts-mhx58tj2 and push.

Codeflash Static Badge

The optimized code achieves a **26% runtime improvement** by implementing **async S3 client connection pooling** instead of creating a new connection for each request.

**Key Optimization: Connection Reuse**
- **Original**: Creates a new `session.client('s3')` connection for every `list_parts()` call, requiring expensive connection setup/teardown
- **Optimized**: Caches the async S3 client in `_s3_async_client` and reuses it across multiple calls via `_get_s3_async_client()`

**Why This Speeds Up Performance**
The line profiler shows the critical difference:
- **Original**: `async with session.client('s3') as s3_client:` takes 16.4% of total time (2.1293e+06 ns)
- **Optimized**: `s3_client = await self._get_s3_async_client()` is much faster as it returns the cached client after the first call

**Connection Management Benefits**
- **Session tracking**: Monitors session changes via `_last_session` to handle client invalidation properly
- **Proper cleanup**: `_close_s3_async_client()` and `__aexit__()` ensure resources are freed
- **Async context safety**: Uses `__aenter__()` and `__aexit__()` for proper async resource lifecycle

**Test Case Performance**
The optimization particularly benefits:
- **Concurrent workloads**: Tests with 50-250 concurrent calls see reduced connection overhead
- **Repeated operations**: Sequential calls to `list_parts()` avoid repeated connection setup
- **High-throughput scenarios**: Multiple S3 operations in quick succession benefit from connection pooling

**Impact on Workloads**
This optimization is most effective when `list_parts()` is called frequently or in batches, as each subsequent call after the first avoids the expensive connection establishment process that aioboto3 normally performs.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 13, 2025 08:05
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant