Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2795 mbd batch targeted runs #666

Merged
merged 21 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 82 additions & 56 deletions api/registry/management/commands/process_batch_model_address_upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import csv
import json
import os
import time
from io import BytesIO, StringIO, TextIOWrapper
from itertools import islice
Expand All @@ -21,6 +22,8 @@
BULK_MODEL_SCORE_REQUESTS_RESULTS_FOLDER,
BULK_SCORE_REQUESTS_ADDRESS_LIST_FOLDER,
BULK_SCORE_REQUESTS_BUCKET_NAME,
S3_BUCKET,
S3_OBJECT_KEY,
)


Expand All @@ -33,71 +36,94 @@ def handle(self, *args, **options):
asyncio.run(self.async_handle(*args, **options))

async def async_handle(self, *args, **options):
pending_requests = await sync_to_async(list)(
BatchModelScoringRequest.objects.filter(
status=BatchRequestStatus.PENDING.value
)
self.stdout.write(f"Received bucket name: `{S3_BUCKET}`")
self.stdout.write(f"Received object key : `{S3_OBJECT_KEY}`")

s3_uri = f"s3://{S3_BUCKET}/{S3_OBJECT_KEY}"

# Find the request id from the filename.
filename = S3_OBJECT_KEY.split(
f"{BULK_SCORE_REQUESTS_ADDRESS_LIST_FOLDER}/"
)[-1]
self.stdout.write(f"Search request with filename: `{filename}`")

request = await sync_to_async(BatchModelScoringRequest.objects.get)(
s3_filename=filename
)

for request in pending_requests:
try:
self.stdout.write(f"Processing request: {request.id}")
self.stdout.write(f"Found request: {request.id}")
try:
self.stdout.write(f"Processing file: {s3_uri}")

file = await sync_to_async(self.download_from_s3)(request.s3_filename)
# file = await sync_to_async(self.download_from_s3)(s3_uri)
file = await sync_to_async(self.download_from_s3)(filename)

if file:
self.stdout.write(self.style.SUCCESS("Got stream, processing CSV"))
bytes = BytesIO(file.read())
text = TextIOWrapper(bytes, encoding="utf-8")
csv_data = csv.reader(text)
total_rows = sum(1 for row in csv_data)
if file:
self.stdout.write(self.style.SUCCESS("Got stream, processing CSV"))
bytes = BytesIO(file.read())
text = TextIOWrapper(bytes, encoding="utf-8")

csv_data = csv.reader(text)

text.seek(0)
csv_data = csv.reader(text)

model_list = request.model_list

results = []
processed_rows = 0
for batch in self.process_csv_in_batches(csv_data):
try:
batch_results = await self.get_analysis(batch, model_list)
results.extend(batch_results)
processed_rows += len(batch_results)
progress = int((processed_rows / total_rows) * 100)
await self.update_progress(request, progress)
if progress % 5 == 0:
await self.create_and_upload_results_csv(
request.id,
results,
f"{request.s3_filename}-partial-{progress}",
)
except Exception as e:
self.stderr.write(
self.style.ERROR(
f"Error processing batch: {str(e)} - Processed rows: {processed_rows}, Total Rows: {total_rows}"
)
# Check if the first row is a header
first_row = next(csv_data)
if first_row[0].lower() == "address":
# Skip the header and continue processing the CSV
total_rows = sum(1 for row in csv_data)
else:
# The first row is not a header, so include it in the processing
total_rows = 1 + sum(1 for row in csv_data) # Adding the first row already read

# Reset the reader to the start of the file or just after the header
text.seek(0)
csv_data = csv.reader(text)

# Skip the header again if it was determined to be a header
if first_row[0].lower() == "address":
next(csv_data)

model_list = request.model_list

results = []
processed_rows = 0
for batch in self.process_csv_in_batches(csv_data):
try:
batch_results = await self.get_analysis(batch, model_list)
results.extend(batch_results)
processed_rows += len(batch_results)
progress = int((processed_rows / total_rows) * 100)
await self.update_progress(request, progress)
if progress % 5 == 0:
await self.create_and_upload_results_csv(
request.id,
results,
f"{request.s3_filename}-partial-{progress}",
)
except Exception as e:
self.stderr.write(
self.style.ERROR(
f"Error processing batch: {str(e)} - Processed rows: {processed_rows}, Total Rows: {total_rows}"
)
)

await self.create_and_upload_results_csv(
request.id, results, request.s3_filename
)
await self.create_and_upload_results_csv(
request.id, results, request.s3_filename
)

# Update status to DONE
request.status = BatchRequestStatus.DONE
request.progress = 100
await sync_to_async(request.save)()
# Update status to DONE
request.status = BatchRequestStatus.DONE
request.progress = 100
await sync_to_async(request.save)()

self.stdout.write(
self.style.SUCCESS(f"Successfully processed request: {request.id}")
)
except Exception as e:
self.stderr.write(
self.style.ERROR(f"Error processing request {request.id}: {str(e)}")
)
# Optionally, update status to ERROR
request.status = BatchRequestStatus.ERROR
await sync_to_async(request.save)()
self.stdout.write(
self.style.SUCCESS(f"Successfully processed request: {request.id}")
)
except Exception as e:
self.stderr.write(
self.style.ERROR(f"Error processing file {s3_uri}: {str(e)}")
)
request.status = BatchRequestStatus.ERROR
await sync_to_async(request.save)()

async def update_progress(self, request, progress):
request.progress = progress
Expand Down
47 changes: 19 additions & 28 deletions api/registry/test/test_command_process_batch_address_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_process_pending_requests(self):
"ContentLength": 100,
"Body": MagicMock(
read=lambda: StringIO(
"0xd5680a051302d427efa518238fda2c848eebe714\n0xd5680a051302d427efa518238fda2c848eebe714\n0x0636F974D29d947d4946b2091d769ec6D2d415DE"
"Address\n0xd5680a051302d427efa518238fda2c848eebe714\n0xd5680a051302d427efa518238fda2c848eebe714\n0x0636F974D29d947d4946b2091d769ec6D2d415DE"
)
.getvalue()
.encode()
Expand All @@ -34,42 +34,33 @@ def test_process_pending_requests(self):
"registry.management.commands.process_batch_model_address_upload.handle_get_analysis",
mock_handle_get_analysis,
):
for i in range(2):
BatchModelScoringRequest.objects.create(
status=BatchRequestStatus.PENDING.value,
s3_filename=f"test_file_{i}.csv",
model_list=["model1", "model2"],
)

all_requests = BatchModelScoringRequest.objects.all()
self.assertEqual(
len(all_requests),
2,
f"Expected 2 requests, but found {len(all_requests)}",
good_request = BatchModelScoringRequest.objects.create(
status=BatchRequestStatus.PENDING.value,
s3_filename=f"test_file.csv",
model_list=["model1", "model2"],
)

call_command("process_batch_model_address_upload")

updated_request = BatchModelScoringRequest.objects.get(id=good_request.id)
self.assertEqual(
updated_request.status,
BatchRequestStatus.DONE.value,
f"Expected status DONE, but got {good_request.status}",
)
self.assertEqual(
updated_request.progress,
100,
f"Expected progress 100, but got {good_request.progress}",
)

for request in BatchModelScoringRequest.objects.all():
self.assertEqual(
request.status,
BatchRequestStatus.DONE.value,
f"Expected status DONE, but got {request.status}",
)
self.assertEqual(
request.progress,
100,
f"Expected progress 100, but got {request.progress}",
)

expected_calls = 6 # 2 files * 3 addresses each
expected_calls = 3 # 1 files * 3 addresses each
self.assertEqual(
mock_handle_get_analysis.call_count,
expected_calls,
f"Expected {expected_calls} calls to handle_get_analysis, but got {mock_handle_get_analysis.call_count}",
)

assert mock_s3_client.get_object.call_count > 1
assert mock_s3_client.get_object.call_count > 0

# If you comment out the following test, the first test will fail :()
def test_handle_error_during_processing(self):
Expand Down
4 changes: 4 additions & 0 deletions api/scorer/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@
"BULK_MODEL_SCORE_REQUESTS_RESULTS_FOLDER", default="model-score-results"
)
BULK_MODEL_SCORE_BATCH_SIZE = env("BULK_MODEL_SCORE_BATCH_SIZE", default=50)

S3_BUCKET = env("S3_BUCKET", default="bulk-score-requests")
S3_OBJECT_KEY = env("S3_OBJECT_KEY", default="test_file.csv")

DATA_SCIENCE_API_KEY = env("DATA_SCIENCE_API_KEY", default="abc")

VERIFIER_URL = env("VERIFIER_URL", default="http://localhost:8001/verifier/verify")
7 changes: 6 additions & 1 deletion infra/aws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,12 @@ const serviceTaskRole = new aws.iam.Role("scorer-service-task-role", {
// S3 permissions
{
Effect: "Allow",
Action: ["s3:PutObject"],
Action: [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:ListObjects",
],
Resource: "*",
},
],
Expand Down
35 changes: 30 additions & 5 deletions infra/lib/scorer/s3_initiated_ecs_task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export function createS3InitiatedECSTask(
taskDefinitionArn: Output<string>,
subnetIds: Output<any>,
securityGroupIds: Output<string>[],
eventsStsAssumeRoleArn: Input<string>,
eventsStsAssumeRoleArn: Input<string>
) {
// Create S3 bucket
const bucket = new aws.s3.Bucket(bucketName, {
Expand Down Expand Up @@ -39,10 +39,12 @@ export function createS3InitiatedECSTask(
name: [bucketName],
},
object: {
key: [{
prefix: "address-lists/"
}]
}
key: [
{
prefix: "address-lists/",
},
],
},
},
}),
});
Expand All @@ -62,6 +64,29 @@ export function createS3InitiatedECSTask(
securityGroups: securityGroupIds,
},
},
inputTransformer: {
inputPaths: {
bucketName: "$.detail.bucket.name",
objectKey: "$.detail.object.key",
},
inputTemplate: JSON.stringify({
containerOverrides: [
{
name: "web", // Replace with actual container name or fetch dynamically
environment: [
{
name: "S3_BUCKET",
value: "<bucketName>",
},
{
name: "S3_OBJECT_KEY",
value: "<objectKey>",
},
],
},
],
}),
},
});

return {
Expand Down
Loading