Skip to content

Commit

Permalink
BSE-4340: Spark benchmark EMR terraform (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
IsaacWarren authored Dec 12, 2024
1 parent f6ae9b8 commit 86d3a48
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 1 deletion.
95 changes: 95 additions & 0 deletions benchmarks/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Spark NYC Taxi Precipitation Benchmark on EMR

---

## Prerequisites

1. **AWS CLI**: Installed and configured with access keys.
2. **Terraform**: Installed on your local machine.
3. **jq**: Installed on your local machine.
4. **gzip**: Installed on your local machine.

---

## Steps to Use the Terraform Script

### 1. Enter the bechmarks/spark Directory

### 2. Initialize Terraform

Run the following command to initialize the Terraform project:

```bash
terraform init
```

### 3. Deploy the Infrastructure

Apply the Terraform script to deploy the resources:

```bash
terraform apply
```

You will be prompted to confirm the deployment. Type `yes` to proceed.

### 4. Verify Deployment

After the deployment completes, the EMR cluster and associated resources will be provisioned. The Python script will be uploaded to a created S3 bucket.

---

## Running the Python Script

The script will automatically run on the EMR cluster as a step during the deployment.

---

## Wait for the Script to Complete

Run the following command to wait for the script to complete:

```bash
./wait_for_steps.sh
```

This will take a few minutes.

---

## Retrieving Logs

### 2. Download Logs Using AWS CLI

Use the AWS CLI to download the logs:

```bash
aws s3 cp s3://"$(terraform output --json | jq -r '.s3_bucket_id.value')"/logs/"$(terraform output --json | jq -r '.emr_cluster_id.value')" ./emr-logs --recursive --region "$(terraform output --json | jq -r '.emr_cluster_region.value')"
```

### 3. Explore the Logs

Logs are structured into the following directories:

- `steps/`: Logs for each step.
- `node/`: Logs for individual cluster nodes.
- `applications/`: Logs for applications like Spark.

Example command to view step logs with execution time result:

```bash
gzip -d ./emr-logs/steps/*/*
cat ./emr-logs/steps/*/stdout
```

---

## Cleaning Up

To destroy the deployed resources and avoid incurring additional costs, run:

```bash
terraform destroy
```

You will be prompted to confirm. Type `yes` to proceed.
250 changes: 250 additions & 0 deletions benchmarks/spark/emr.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
provider "aws" {
region = "us-east-2"
}

resource "aws_s3_bucket" "emr_bucket" {
bucket = "spark-benchmark-python-script-bucket-${random_id.bucket_id.hex}"
force_destroy = true
}

resource "random_id" "bucket_id" {
byte_length = 8
}

resource "aws_s3_object" "python_script" {
bucket = aws_s3_bucket.emr_bucket.id
key = "scripts/spark_nyc_taxi_precipitation.py"
source = "./spark_nyc_taxi_precipitation.py"
}

resource "aws_s3_object" "bootstrap_script" {
bucket = aws_s3_bucket.emr_bucket.id
key = "scripts/bootstrap.sh"
content = <<EOF
#!/bin/bash
sudo pip install -U pandas numpy==1.26.4 pyarrow
EOF
}

resource "aws_emr_cluster" "emr_cluster" {
name = "EMR-Cluster"
release_label = "emr-7.5.0"
applications = ["Hadoop", "Spark"]

ec2_attributes {
instance_profile = aws_iam_instance_profile.emr_profile.arn
subnet_id = aws_subnet.emr_subnet.id
emr_managed_master_security_group = aws_security_group.allow_access.id
emr_managed_slave_security_group = aws_security_group.allow_access.id
}

master_instance_group {
instance_type = "r6i.16xlarge"
}

core_instance_group {
instance_type = "r6i.16xlarge"
instance_count = 3

ebs_config {
size = "40"
type = "gp3"
volumes_per_instance = 1
}
}

bootstrap_action {
name = "Install Python Dependencies"
path = "s3://${aws_s3_bucket.emr_bucket.id}/scripts/bootstrap.sh"
args = []
}
log_uri = "s3://${aws_s3_bucket.emr_bucket.id}/logs/"

step {
name = "Run Python Script"
action_on_failure = "TERMINATE_CLUSTER"
hadoop_jar_step {
jar = "command-runner.jar"
args = ["spark-submit", "s3://${aws_s3_bucket.emr_bucket.id}/scripts/spark_nyc_taxi_precipitation.py"]
}
}
auto_termination_policy {
idle_timeout = 60
}

service_role = aws_iam_role.emr_service_role.arn
tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}

resource "aws_iam_role" "emr_service_role" {
name = "EMR_Service_Role"

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "elasticmapreduce.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}

resource "aws_iam_role_policy_attachment" "emr_service_role_policy" {
role = aws_iam_role.emr_service_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEMRServicePolicy_v2"
}
resource "aws_iam_role_policy" "emr_pass_intsance_role_policy" {
name = "EMR_Pass_Instance_Role_Policy"
role = aws_iam_role.emr_service_role.name
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PassRoleForEC2",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "${aws_iam_role.emr_instance_role.arn}"
}
]
}
EOF
}

resource "aws_iam_role_policy" "emr_instance_profile_policy" {
name = "EMR_S3_Access_Policy"
role = aws_iam_role.emr_instance_role.name

policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Access",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::${aws_s3_bucket.emr_bucket.id}/*",
"arn:aws:s3:::${aws_s3_bucket.emr_bucket.id}"
]
}
]
}
EOF
}
resource "aws_iam_role" "emr_instance_role" {
name = "EMR_EC2_Instance_Role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}

resource "aws_iam_instance_profile" "emr_profile" {
name = "EMR_EC2_Instance_Profile"
role = aws_iam_role.emr_instance_role.name
}

resource "aws_subnet" "emr_subnet" {
vpc_id = aws_vpc.emr_vpc.id
cidr_block = "10.0.0.0/16"
availability_zone = "us-east-2b"
tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}

resource "aws_vpc" "emr_vpc" {
cidr_block = "10.0.0.0/16"
tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}

resource "aws_security_group" "allow_access" {
name = "allow_access"
description = "Allow inbound traffic"
vpc_id = aws_vpc.emr_vpc.id
revoke_rules_on_delete = true

ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = [aws_vpc.emr_vpc.cidr_block]
}

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}

depends_on = [aws_subnet.emr_subnet]

lifecycle {
ignore_changes = [
ingress,
egress,
]
}

tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}



resource "aws_internet_gateway" "emr_igw" {
vpc_id = aws_vpc.emr_vpc.id
}

resource "aws_route_table" "emr_route_table" {
vpc_id = aws_vpc.emr_vpc.id

route {
cidr_block = "0.0.0.0/0"
gateway_id = aws_internet_gateway.emr_igw.id
}
}

resource "aws_route_table_association" "emr_route_table_association" {
subnet_id = aws_subnet.emr_subnet.id
route_table_id = aws_route_table.emr_route_table.id
}

data "aws_region" "current" {}

output "emr_cluster_id" {
value = aws_emr_cluster.emr_cluster.id
}
output "emr_cluster_region" {
value = data.aws_region.current.name
}
output "s3_bucket_id" {
value = aws_s3_bucket.emr_bucket.id
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def get_monthly_travels_weather():
central_park_weather_observations["date"]
)

# Read in trip data using spark
# Read in trip data using spark, this reads a re-written dataset because spark doesn't support reading the original dataset
# due to schema unification issues
fhvhv_tripdata = spark.read.parquet(
"s3a://bodo-example-data/nyc-taxi/fhvhv_tripdata_rewrite/"
).drop("__index_level_0__")
Expand Down
17 changes: 17 additions & 0 deletions benchmarks/spark/wait_for_step.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Extract cluster and step IDs from Terraform JSON output
CLUSTER_ID="$(terraform output --json | jq -r '.emr_cluster_id.value')"
EMR_CLUSTER_REGION="$(terraform output --json | jq -r '.emr_cluster_region.value')"
STEP_ID="$(aws emr list-steps --cluster-id "$CLUSTER_ID" --query 'Steps[0].Id' --output text --region "$EMR_CLUSTER_REGION")"

# Wait for the EMR step to complete
aws emr wait step-complete --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --region "$EMR_CLUSTER_REGION"

# Check the step status
STEP_STATUS=$(aws emr describe-step --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --query 'Step.Status.State' --output text --region "$EMR_CLUSTER_REGION")

if [ "$STEP_STATUS" == "COMPLETED" ]; then
echo "Step completed successfully."
else
echo "Step did not complete successfully. Status: $STEP_STATUS"
fi

0 comments on commit 86d3a48

Please sign in to comment.