Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
Update tasks.py to work with AMI 4.x.x (close #20)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Mar 28, 2016
1 parent f9c6859 commit f8a7807
Showing 1 changed file with 46 additions and 43 deletions.
89 changes: 46 additions & 43 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
from boto.s3.connection import Location
from boto.s3.key import Key

import boto.emr
from boto.emr.step import InstallHiveStep, ScriptRunnerStep
from boto.emr.bootstrap_action import BootstrapAction
import boto3

HELLO_TXT = "in/hello.txt"
JAR_FILE = "spark-example-project-0.4.0.jar"
Expand Down Expand Up @@ -83,44 +81,49 @@ def upload(profile, bucket):

@task
def run_emr(profile, bucket, ec2_keyname, vpc_subnet_id):
c = boto.connect_s3(profile_name=profile)
b = c.get_bucket(bucket)
r = get_valid_region(b.get_location())

bootstrap_actions = [
BootstrapAction("Install Spark", "s3://support.elasticmapreduce/spark/install-spark", ["-x"])
]

args = [
"/home/hadoop/spark/bin/spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn-cluster",
"--class",
"com.snowplowanalytics.spark.WordCountJob",
"s3://" + bucket + "/jar/" + JAR_FILE,
"s3n://" + bucket + "/" + HELLO_TXT,
"s3n://" + bucket + "/out"
]
steps = [
InstallHiveStep(),
ScriptRunnerStep("Run WordCountJob", step_args=args)
]

conn = boto.emr.connect_to_region(r, profile_name=profile)
job_id = conn.run_jobflow(
name="Spark Example Project",
log_uri="s3://" + bucket + "/logs",
ec2_keyname=ec2_keyname,
master_instance_type="m3.xlarge",
slave_instance_type="m3.xlarge",
num_instances=3,
enable_debugging=True,
ami_version="3.6",
steps=steps,
bootstrap_actions=bootstrap_actions,
job_flow_role="EMR_EC2_DefaultRole",
service_role="EMR_DefaultRole"
s3_client = boto.connect_s3(profile_name=profile)
region = get_valid_region(s3_client.get_bucket(bucket).get_location())

boto3.setup_default_session(profile_name=profile)
client = boto3.client('emr', region_name=region)
response = client.run_job_flow(
Name='Spark Example Project',
LogUri="s3://" + bucket + "/logs",
Instances={
'MasterInstanceType': 'm3.xlarge',
'SlaveInstanceType': 'm3.xlarge',
'InstanceCount': 3,
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected':False,
'Ec2KeyName': ec2_keyname,
'Ec2SubnetId': vpc_subnet_id,
},
ReleaseLabel='emr-4.4.0',
Applications=[
{
'Name': 'Spark'
},
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
Steps=[
{
'Name': 'Run Spark WordCountJob',
'ActionOnFailure': 'TERMINATE_JOB_FLOW',
'HadoopJarStep': {
'Jar': "command-runner.jar",
'Args': [
"spark-submit",
"--deploy-mode", "cluster",
"--master", "yarn-cluster",
"--class", "com.snowplowanalytics.spark.WordCountJob",
"s3://" + bucket + "/jar/" + JAR_FILE,
"s3n://" + bucket + "/" + HELLO_TXT,
"s3n://" + bucket + "/out"
],
},
},
],
)
print "Started jobflow " + job_id
print "Started jobflow " + response['JobFlowId']

0 comments on commit f8a7807

Please sign in to comment.