Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions backend/python/app/sources/external/s3/s3.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from app.sources.client.s3.s3 import S3Client, S3Response
from codeflash.code_utils.codeflash_wrap_decorator import \
codeflash_performance_async

try:
import aioboto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
except ImportError:
raise ImportError("aioboto3 is not installed. Please install it with `pip install aioboto3`")

from app.sources.client.s3.s3 import S3Client, S3Response


class S3DataSource:
Expand Down Expand Up @@ -48,13 +51,12 @@ def _handle_s3_response(self, response: object) -> S3Response:
return S3Response(success=False, error="Empty response from S3 API")

if isinstance(response, dict):
if 'Error' in response:
error_info = response['Error']
error_code = error_info.get('Code', 'Unknown')
error_message = error_info.get('Message', 'No message')
error = response.get('Error')
if error is not None:
error_code = error.get('Code', 'Unknown')
error_message = error.get('Message', 'No message')
return S3Response(success=False, error=f"{error_code}: {error_message}")
return S3Response(success=True, data=response)

return S3Response(success=True, data=response)

except Exception as e:
Expand Down Expand Up @@ -1520,6 +1522,7 @@ async def get_bucket_logging(self,
except Exception as e:
return S3Response(success=False, error=f"Unexpected error: {str(e)}")

@codeflash_performance_async
async def get_bucket_metrics_configuration(self,
Bucket: str,
Id: str,
Expand Down Expand Up @@ -4122,7 +4125,15 @@ async def upload_fileobj(self,
try:
session = await self._get_aioboto3_session()
async with session.client('s3') as s3_client:
response = await getattr(s3_client, 'upload_fileobj')(**kwargs)
# Run upload_fileobj in a separate thread to avoid any possible blocking from botocore internals (esp. fileobj implementations)
# This is async-safe as aioboto3's client is itself async, but Fileobj may be a synchronous stream
# If Fileobj is a truly async file, this isn't strictly needed, but for performance we can use to_thread conditionally

# Only offload to thread if Fileobj is a standard file (has .read and is not an async file type)
if hasattr(Fileobj, "read") and not hasattr(Fileobj, "read_async"):
response = await asyncio.to_thread(getattr(s3_client, 'upload_fileobj'), **kwargs)
else:
response = await getattr(s3_client, 'upload_fileobj')(**kwargs)
return self._handle_s3_response(response)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
Expand Down