Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BSE-4340: Spark benchmark EMR terraform #50

Merged
merged 15 commits into from
Dec 12, 2024
95 changes: 95 additions & 0 deletions benchmarks/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Spark NYC Taxi Precipitation Benchmark on EMR

---

## Prerequisites

1. **AWS CLI**: Installed and configured with access keys.
2. **Terraform**: Installed on your local machine.
3. **jq**: Installed on your local machine.
4. **gzip**: Installed on your local machine.

---

## Steps to Use the Terraform Script

### 1. Enter the bechmarks/spark Directory

### 2. Initialize Terraform

Run the following command to initialize the Terraform project:

```bash
terraform init
```

### 3. Deploy the Infrastructure

Apply the Terraform script to deploy the resources:

```bash
terraform apply
```

You will be prompted to confirm the deployment. Type `yes` to proceed.

### 4. Verify Deployment

After the deployment completes, the EMR cluster and associated resources will be provisioned. The Python script will be uploaded to a created S3 bucket.

---

## Running the Python Script

The script will automatically run on the EMR cluster as a step during the deployment.

---

## Wait for the Script to Complete

Run the following command to wait for the script to complete:

```bash
./wait_for_steps.sh
```

This will take a few minutes.

---

## Retrieving Logs

### 2. Download Logs Using AWS CLI

Use the AWS CLI to download the logs:

```bash
aws s3 cp s3://"$(terraform output --json | jq -r '.s3_bucket_id.value')"/logs/"$(terraform output --json | jq -r '.emr_cluster_id.value')" ./emr-logs --recursive --region "$(terraform output --json | jq -r '.emr_cluster_region.value')"
```

### 3. Explore the Logs

Logs are structured into the following directories:

- `steps/`: Logs for each step.
- `node/`: Logs for individual cluster nodes.
- `applications/`: Logs for applications like Spark.

Example command to view step logs with execution time result:

```bash
gzip -d ./emr-logs/steps/*/*
cat ./emr-logs/steps/*/stdout
```

---

## Cleaning Up

To destroy the deployed resources and avoid incurring additional costs, run:

```bash
terraform destroy
```

You will be prompted to confirm. Type `yes` to proceed.
250 changes: 250 additions & 0 deletions benchmarks/spark/emr.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
provider "aws" {
region = "us-east-2"
}

resource "aws_s3_bucket" "emr_bucket" {
bucket = "spark-benchmark-python-script-bucket-${random_id.bucket_id.hex}"
force_destroy = true
}

resource "random_id" "bucket_id" {
byte_length = 8
}

resource "aws_s3_object" "python_script" {
bucket = aws_s3_bucket.emr_bucket.id
key = "scripts/spark_nyc_taxi_preciptation.py"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the name of this file in this PR since I believe there was a typo originally?

Suggested change
key = "scripts/spark_nyc_taxi_preciptation.py"
key = "scripts/spark_nyc_taxi_precipitation.py"

And change the file name too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed

source = "./spark_nyc_taxi_preciptation.py"
}

resource "aws_s3_object" "bootstrap_script" {
bucket = aws_s3_bucket.emr_bucket.id
key = "scripts/bootstrap.sh"
content = <<EOF
#!/bin/bash
sudo pip install -U pandas numpy==1.26.4 pyarrow
EOF
}

resource "aws_emr_cluster" "emr_cluster" {
name = "EMR-Cluster"
release_label = "emr-7.5.0"
applications = ["Hadoop", "Spark"]

ec2_attributes {
instance_profile = aws_iam_instance_profile.emr_profile.arn
subnet_id = aws_subnet.emr_subnet.id
emr_managed_master_security_group = aws_security_group.allow_access.id
emr_managed_slave_security_group = aws_security_group.allow_access.id
}

master_instance_group {
instance_type = "r6i.16xlarge"
}

core_instance_group {
instance_type = "r6i.16xlarge"
instance_count = 3

ebs_config {
size = "40"
type = "gp3"
volumes_per_instance = 1
}
}

bootstrap_action {
name = "Install Python Dependencies"
path = "s3://${aws_s3_bucket.emr_bucket.id}/scripts/bootstrap.sh"
args = []
}
log_uri = "s3://${aws_s3_bucket.emr_bucket.id}/logs/"

step {
name = "Run Python Script"
action_on_failure = "TERMINATE_CLUSTER"
hadoop_jar_step {
jar = "command-runner.jar"
args = ["spark-submit", "s3://${aws_s3_bucket.emr_bucket.id}/scripts/spark_nyc_taxi_preciptation.py"]
}
}
auto_termination_policy {
idle_timeout = 60
}

service_role = aws_iam_role.emr_service_role.arn
tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}

resource "aws_iam_role" "emr_service_role" {
name = "EMR_Service_Role"

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "elasticmapreduce.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}

resource "aws_iam_role_policy_attachment" "emr_service_role_policy" {
role = aws_iam_role.emr_service_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEMRServicePolicy_v2"
}
resource "aws_iam_role_policy" "emr_pass_intsance_role_policy" {
name = "EMR_Pass_Instance_Role_Policy"
role = aws_iam_role.emr_service_role.name
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PassRoleForEC2",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "${aws_iam_role.emr_instance_role.arn}"
}
]
}
EOF
}

resource "aws_iam_role_policy" "emr_instance_profile_policy" {
name = "EMR_S3_Access_Policy"
role = aws_iam_role.emr_instance_role.name

policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Access",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::${aws_s3_bucket.emr_bucket.id}/*",
"arn:aws:s3:::${aws_s3_bucket.emr_bucket.id}"
]
}
]
}
EOF
}
resource "aws_iam_role" "emr_instance_role" {
name = "EMR_EC2_Instance_Role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}

resource "aws_iam_instance_profile" "emr_profile" {
name = "EMR_EC2_Instance_Profile"
role = aws_iam_role.emr_instance_role.name
}

resource "aws_subnet" "emr_subnet" {
vpc_id = aws_vpc.emr_vpc.id
cidr_block = "10.0.0.0/16"
availability_zone = "us-east-2b"
tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}

resource "aws_vpc" "emr_vpc" {
cidr_block = "10.0.0.0/16"
tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}

resource "aws_security_group" "allow_access" {
name = "allow_access"
description = "Allow inbound traffic"
vpc_id = aws_vpc.emr_vpc.id
revoke_rules_on_delete = true

ingress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = [aws_vpc.emr_vpc.cidr_block]
}

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}

depends_on = [aws_subnet.emr_subnet]

lifecycle {
ignore_changes = [
ingress,
egress,
]
}

tags = {
for-use-with-amazon-emr-managed-policies = "true"
}
}



resource "aws_internet_gateway" "emr_igw" {
vpc_id = aws_vpc.emr_vpc.id
}

resource "aws_route_table" "emr_route_table" {
vpc_id = aws_vpc.emr_vpc.id

route {
cidr_block = "0.0.0.0/0"
gateway_id = aws_internet_gateway.emr_igw.id
}
}

resource "aws_route_table_association" "emr_route_table_association" {
subnet_id = aws_subnet.emr_subnet.id
route_table_id = aws_route_table.emr_route_table.id
}

data "aws_region" "current" {}

output "emr_cluster_id" {
value = aws_emr_cluster.emr_cluster.id
}
output "emr_cluster_region" {
value = data.aws_region.current.name
}
output "s3_bucket_id" {
value = aws_s3_bucket.emr_bucket.id
}
3 changes: 2 additions & 1 deletion benchmarks/spark/spark_nyc_taxi_preciptation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def get_monthly_travels_weather():
central_park_weather_observations["date"]
)

# Read in trip data using spark
# Read in trip data using spark, this reads a re-written dataset because spark doesn't support reading the original dataset
# due to schema unification issues
fhvhv_tripdata = spark.read.parquet(
"s3a://bodo-example-data/nyc-taxi/fhvhv_tripdata_rewrite/"
).drop("__index_level_0__")
Expand Down
17 changes: 17 additions & 0 deletions benchmarks/spark/wait_for_step.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Extract cluster and step IDs from Terraform JSON output
CLUSTER_ID="$(terraform output --json | jq -r '.emr_cluster_id.value')"
EMR_CLUSTER_REGION="$(terraform output --json | jq -r '.emr_cluster_region.value')"
STEP_ID="$(aws emr list-steps --cluster-id "$CLUSTER_ID" --query 'Steps[0].Id' --output text --region "$EMR_CLUSTER_REGION")"

# Wait for the EMR step to complete
aws emr wait step-complete --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --region "$EMR_CLUSTER_REGION"

# Check the step status
STEP_STATUS=$(aws emr describe-step --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --query 'Step.Status.State' --output text --region "$EMR_CLUSTER_REGION")

if [ "$STEP_STATUS" == "COMPLETED" ]; then
echo "Step completed successfully."
else
echo "Step did not complete successfully. Status: $STEP_STATUS"
fi

Loading