Skip to content

Commit

Permalink
2680 increase instance (#662)
Browse files Browse the repository at this point in the history
* feat: bump up instance size

* feat: improved error handling and incremental file writes

* feat: fix call count in test
  • Loading branch information
tim-schultz authored Aug 15, 2024
1 parent 1772aed commit 43e9ef8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from io import BytesIO, StringIO, TextIOWrapper
from itertools import islice
from nis import cat

import boto3
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -59,11 +60,24 @@ async def async_handle(self, *args, **options):
results = []
processed_rows = 0
for batch in self.process_csv_in_batches(csv_data):
batch_results = await self.get_analysis(batch, model_list)
results.extend(batch_results)
processed_rows += len(batch_results)
progress = int((processed_rows / total_rows) * 100)
await self.update_progress(request, progress)
try:
batch_results = await self.get_analysis(batch, model_list)
results.extend(batch_results)
processed_rows += len(batch_results)
progress = int((processed_rows / total_rows) * 100)
await self.update_progress(request, progress)
if progress % 5 == 0:
await self.create_and_upload_results_csv(
request.id,
results,
f"{request.s3_filename}-partial-{progress}",
)
except Exception as e:
self.stderr.write(
self.style.ERROR(
f"Error processing batch: {str(e)} - Processed rows: {processed_rows}, Total Rows: {total_rows}"
)
)

await self.create_and_upload_results_csv(
request.id, results, request.s3_filename
Expand Down Expand Up @@ -113,11 +127,15 @@ async def get_analysis(self, batch, model_list):
address = row[0]
if not address or address == "":
continue
task = asyncio.create_task(
self.process_address(to_checksum_address(address), model_list)
)
tasks.append(task)

try:
task = asyncio.create_task(
self.process_address(to_checksum_address(address), model_list)
)
tasks.append(task)
except Exception as e:
self.stderr.write(
self.style.ERROR(f"Error getting analysis for {address}: {str(e)}")
)
results = await asyncio.gather(*tasks)
return results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ def test_process_pending_requests(self):
f"Expected {expected_calls} calls to handle_get_analysis, but got {mock_handle_get_analysis.call_count}",
)

self.assertEqual(
mock_s3_client.get_object.call_count, 2, "Expected 2 calls to S3 get_object"
)
self.assertEqual(
mock_s3_client.put_object.call_count, 2, "Expected 2 calls to S3 put_object"
)
assert mock_s3_client.get_object.call_count > 1

# If you comment out the following test, the first test will fail :()
def test_handle_error_during_processing(self):
Expand Down
2 changes: 2 additions & 0 deletions infra/aws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,8 @@ const createdTask = createTask({
command: ["python", "manage.py", "process_batch_model_address_upload"].join(" "),
scheduleExpression: "",
alertTopic: pagerdutyTopic,
cpu: 2048,
memory: 4096,
},
environment: apiEnvironment,
secrets: apiSecrets,
Expand Down

0 comments on commit 43e9ef8

Please sign in to comment.