Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2680 increase instance #662

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from io import BytesIO, StringIO, TextIOWrapper
from itertools import islice
from nis import cat

import boto3
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -59,11 +60,24 @@ async def async_handle(self, *args, **options):
results = []
processed_rows = 0
for batch in self.process_csv_in_batches(csv_data):
batch_results = await self.get_analysis(batch, model_list)
results.extend(batch_results)
processed_rows += len(batch_results)
progress = int((processed_rows / total_rows) * 100)
await self.update_progress(request, progress)
try:
batch_results = await self.get_analysis(batch, model_list)
results.extend(batch_results)
processed_rows += len(batch_results)
progress = int((processed_rows / total_rows) * 100)
await self.update_progress(request, progress)
if progress % 5 == 0:
await self.create_and_upload_results_csv(
request.id,
results,
f"{request.s3_filename}-partial-{progress}",
)
except Exception as e:
self.stderr.write(
self.style.ERROR(
f"Error processing batch: {str(e)} - Processed rows: {processed_rows}, Total Rows: {total_rows}"
)
)

await self.create_and_upload_results_csv(
request.id, results, request.s3_filename
Expand Down Expand Up @@ -113,11 +127,15 @@ async def get_analysis(self, batch, model_list):
address = row[0]
if not address or address == "":
continue
task = asyncio.create_task(
self.process_address(to_checksum_address(address), model_list)
)
tasks.append(task)

try:
task = asyncio.create_task(
self.process_address(to_checksum_address(address), model_list)
)
tasks.append(task)
except Exception as e:
self.stderr.write(
self.style.ERROR(f"Error getting analysis for {address}: {str(e)}")
)
results = await asyncio.gather(*tasks)
return results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ def test_process_pending_requests(self):
f"Expected {expected_calls} calls to handle_get_analysis, but got {mock_handle_get_analysis.call_count}",
)

self.assertEqual(
mock_s3_client.get_object.call_count, 2, "Expected 2 calls to S3 get_object"
)
self.assertEqual(
mock_s3_client.put_object.call_count, 2, "Expected 2 calls to S3 put_object"
)
assert mock_s3_client.get_object.call_count > 1

# If you comment out the following test, the first test will fail :()
def test_handle_error_during_processing(self):
Expand Down
2 changes: 2 additions & 0 deletions infra/aws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,8 @@ const createdTask = createTask({
command: ["python", "manage.py", "process_batch_model_address_upload"].join(" "),
scheduleExpression: "",
alertTopic: pagerdutyTopic,
cpu: 2048,
memory: 4096,
},
environment: apiEnvironment,
secrets: apiSecrets,
Expand Down
Loading