Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/ec2-scripts.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@ another.
permissions on your private key file, you can run `launch` with the
`--resume` option to restart the setup process on an existing cluster.

# Launching a Cluster in a VPC

- Run
`./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> --vpc-id=<vpc-id> --subnet-id=<subnet-id> launch <cluster-name>`,
where `<keypair>` is the name of your EC2 key pair (that you gave it
when you created it), `<key-file>` is the private key file for your
key pair, `<num-slaves>` is the number of slave nodes to launch (try
1 at first), `<vpc-id>` is the name of your VPC, `<subnet-id>` is the
name of your subnet, and `<cluster-name>` is the name to give to your
cluster.

For example:

```bash
export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 --spark-version=1.1.0 launch my-spark-cluster
```

# Running Applications

- Go into the `ec2` directory in the release of Spark you downloaded.
Expand Down
66 changes: 51 additions & 15 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def parse_args():
parser.add_option(
"--copy-aws-credentials", action="store_true", default=False,
help="Add AWS credentials to hadoop configuration to allow Spark to access S3")
parser.add_option(
"--subnet-id", default=None, help="VPC subnet to launch instances in")
parser.add_option(
"--vpc-id", default=None, help="VPC to launch instances in")

(opts, args) = parser.parse_args()
if len(args) != 2:
Expand All @@ -186,14 +190,14 @@ def parse_args():


# Get the EC2 security group of the given name, creating it if it doesn't exist
def get_or_make_group(conn, name):
def get_or_make_group(conn, name, vpc_id):
groups = conn.get_all_security_groups()
group = [g for g in groups if g.name == name]
if len(group) > 0:
return group[0]
else:
print "Creating security group " + name
return conn.create_security_group(name, "Spark EC2 group")
return conn.create_security_group(name, "Spark EC2 group", vpc_id)


# Check whether a given EC2 instance object is in a state we consider active,
Expand Down Expand Up @@ -303,12 +307,26 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read()

print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
authorized_address = opts.authorized_address
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
if opts.vpc_id is None:
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
else:
master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=master_group)
master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=master_group)
master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=master_group)
master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=slave_group)
master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=slave_group)
master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=slave_group)
master_group.authorize('tcp', 22, 22, authorized_address)
master_group.authorize('tcp', 8080, 8081, authorized_address)
master_group.authorize('tcp', 18080, 18080, authorized_address)
Expand All @@ -320,8 +338,22 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, authorized_address)
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
if opts.vpc_id is None:
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
else:
slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=master_group)
slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=master_group)
slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=master_group)
slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=slave_group)
slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=slave_group)
slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=slave_group)
slave_group.authorize('tcp', 22, 22, authorized_address)
slave_group.authorize('tcp', 8080, 8081, authorized_address)
slave_group.authorize('tcp', 50060, 50060, authorized_address)
Expand All @@ -341,11 +373,12 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ami is None:
opts.ami = get_spark_ami(opts)

additional_groups = []
# we use group ids to work around https://github.com/boto/boto/issues/350
additional_group_ids = []
if opts.additional_security_group:
additional_groups = [sg
for sg in conn.get_all_security_groups()
if opts.additional_security_group in (sg.name, sg.id)]
additional_group_ids = [sg.id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this change from sg to sg.id is due to boto/boto#350, can you add a comment referencing that issue? Something like

# we use group ids to work around https://github.com/boto/boto/issues/350

would be fine.

for sg in conn.get_all_security_groups()
if opts.additional_security_group in (sg.name, sg.id)]
print "Launching instances..."

try:
Expand Down Expand Up @@ -392,9 +425,10 @@ def launch_cluster(conn, opts, cluster_name):
placement=zone,
count=num_slaves_this_zone,
key_name=opts.key_pair,
security_groups=[slave_group] + additional_groups,
security_group_ids=[slave_group.id] + additional_group_ids,
instance_type=opts.instance_type,
block_device_map=block_map,
subnet_id=opts.subnet_id,
user_data=user_data_content)
my_req_ids += [req.id for req in slave_reqs]
i += 1
Expand Down Expand Up @@ -441,12 +475,13 @@ def launch_cluster(conn, opts, cluster_name):
num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
if num_slaves_this_zone > 0:
slave_res = image.run(key_name=opts.key_pair,
security_groups=[slave_group] + additional_groups,
security_group_ids=[slave_group.id] + additional_group_ids,
instance_type=opts.instance_type,
placement=zone,
min_count=num_slaves_this_zone,
max_count=num_slaves_this_zone,
block_device_map=block_map,
subnet_id=opts.subnet_id,
user_data=user_data_content)
slave_nodes += slave_res.instances
print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
Expand All @@ -467,12 +502,13 @@ def launch_cluster(conn, opts, cluster_name):
if opts.zone == 'all':
opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name=opts.key_pair,
security_groups=[master_group] + additional_groups,
security_group_ids=[master_group.id] + additional_group_ids,
instance_type=master_type,
placement=opts.zone,
min_count=1,
max_count=1,
block_device_map=block_map,
subnet_id=opts.subnet_id,
user_data=user_data_content)
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)
Expand Down