From 52aaeec7b03251f3fcb4d1cf892df7c592e03408 Mon Sep 17 00:00:00 2001 From: Mike Jennings Date: Mon, 20 Oct 2014 23:05:09 -0700 Subject: [PATCH 1/4] [SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py --- ec2/spark_ec2.py | 51 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0d6b82b4944f3..10e862b201fb2 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -162,6 +162,10 @@ def parse_args(): parser.add_option( "--copy-aws-credentials", action="store_true", default=False, help="Add AWS credentials to hadoop configuration to allow Spark to access S3") + parser.add_option( + "--subnet-id", default=None, help="VPC Subnet id where to launch instances") + parser.add_option( + "--vpc-id", default=None, help="VPC ID where to launch instances") (opts, args) = parser.parse_args() if len(args) != 2: @@ -186,14 +190,14 @@ def parse_args(): # Get the EC2 security group of the given name, creating it if it doesn't exist -def get_or_make_group(conn, name): +def get_or_make_group(conn, name, vpc_id): groups = conn.get_all_security_groups() group = [g for g in groups if g.name == name] if len(group) > 0: return group[0] else: print "Creating security group " + name - return conn.create_security_group(name, "Spark EC2 group") + return conn.create_security_group(name, "Spark EC2 group", vpc_id) # Check whether a given EC2 instance object is in a state we consider active, @@ -304,15 +308,20 @@ def launch_cluster(conn, opts, cluster_name): print "Setting up security groups..." if opts.security_group_prefix is None: - master_group = get_or_make_group(conn, cluster_name + "-master") - slave_group = get_or_make_group(conn, cluster_name + "-slaves") + master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) + slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) else: - master_group = get_or_make_group(conn, opts.security_group_prefix + "-master") - slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves") + master_group = get_or_make_group(conn, opts.security_group_prefix + "-master", opts.vpc_id) + slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves", opts.vpc_id) authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) + if opts.vpc_id == None: + master_group.authorize(src_group=master_group) + master_group.authorize(src_group=slave_group) + else: + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) + master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) + master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) @@ -324,8 +333,13 @@ def launch_cluster(conn, opts, cluster_name): if opts.ganglia: master_group.authorize('tcp', 5080, 5080, authorized_address) if slave_group.rules == []: # Group was just now created - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) + if opts.vpc_id == None: + slave_group.authorize(src_group=master_group) + slave_group.authorize(src_group=slave_group) + else: + slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address) @@ -344,11 +358,11 @@ def launch_cluster(conn, opts, cluster_name): if opts.ami is None: opts.ami = get_spark_ami(opts) - additional_groups = [] + additional_group_ids = [] if opts.additional_security_group: - additional_groups = [sg - for sg in conn.get_all_security_groups() - if opts.additional_security_group in (sg.name, sg.id)] + additional_group_ids = [sg.id + for sg in conn.get_all_security_groups() + if opts.additional_security_group in (sg.name, sg.id)] print "Launching instances..." try: @@ -395,9 +409,10 @@ def launch_cluster(conn, opts, cluster_name): placement=zone, count=num_slaves_this_zone, key_name=opts.key_pair, - security_groups=[slave_group] + additional_groups, + security_group_ids=[slave_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, + subnet_id=opts.subnet_id, user_data=user_data_content) my_req_ids += [req.id for req in slave_reqs] i += 1 @@ -448,12 +463,13 @@ def launch_cluster(conn, opts, cluster_name): num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) if num_slaves_this_zone > 0: slave_res = image.run(key_name=opts.key_pair, - security_groups=[slave_group] + additional_groups, + security_group_ids=[slave_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, min_count=num_slaves_this_zone, max_count=num_slaves_this_zone, block_device_map=block_map, + subnet_id=opts.subnet_id, user_data=user_data_content) slave_nodes += slave_res.instances print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, @@ -474,12 +490,13 @@ def launch_cluster(conn, opts, cluster_name): if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name=opts.key_pair, - security_groups=[master_group] + additional_groups, + security_group_ids=[master_group.id] + additional_group_ids, instance_type=master_type, placement=opts.zone, min_count=1, max_count=1, block_device_map=block_map, + subnet_id=opts.subnet_id, user_data=user_data_content) master_nodes = master_res.instances print "Launched master in %s, regid = %s" % (zone, master_res.id) From 731d94c3c98cf4b5c09d6f2e6a7bbac37a423d12 Mon Sep 17 00:00:00 2001 From: Mike Jennings Date: Mon, 15 Dec 2014 21:47:52 -0800 Subject: [PATCH 2/4] Update for code review. --- docs/ec2-scripts.md | 20 ++++++++++++++++++++ ec2/spark_ec2.py | 13 ++++++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md index ed51d0abb3a45..f99d6b1a2bbf7 100644 --- a/docs/ec2-scripts.md +++ b/docs/ec2-scripts.md @@ -94,6 +94,26 @@ another. permissions on your private key file, you can run `launch` with the `--resume` option to restart the setup process on an existing cluster. +# Launching a Cluster in a VPC + +- Run + `./spark-ec2 -k -i -s --vpc-id= --subnet-id= launch `, + where `` is the name of your EC2 key pair (that you gave it + when you created it), `` is the private key file for your + key pair, `` is the number of slave nodes to launch (try + 1 at first), `` is the name of your VPC, `` is the + name of your subnet, and `` is the name to give to your + cluster, `` is the name of your VPC, and `` is the + name of your subnet. + + For example: + + ```bash + export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU +export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123 +./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 --spark-version=1.1.0 launch my-spark-cluster + ``` + # Running Applications - Go into the `ec2` directory in the release of Spark you downloaded. diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6ba6cc704b01d..a1e3bc3444284 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -163,9 +163,9 @@ def parse_args(): "--copy-aws-credentials", action="store_true", default=False, help="Add AWS credentials to hadoop configuration to allow Spark to access S3") parser.add_option( - "--subnet-id", default=None, help="VPC Subnet id where to launch instances") + "--subnet-id", default=None, help="VPC subnet to launch instances in") parser.add_option( - "--vpc-id", default=None, help="VPC ID where to launch instances") + "--vpc-id", default=None, help="VPC to launch instances in") (opts, args) = parser.parse_args() if len(args) != 2: @@ -315,9 +315,12 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize(src_group=master_group) master_group.authorize(src_group=slave_group) else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) + master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=master_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) + master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) @@ -335,7 +338,10 @@ def launch_cluster(conn, opts, cluster_name): else: slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) + slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=master_group) + slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) + slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address) @@ -355,6 +361,7 @@ def launch_cluster(conn, opts, cluster_name): if opts.ami is None: opts.ami = get_spark_ami(opts) + # we use group ids to work around https://github.com/boto/boto/issues/350 additional_group_ids = [] if opts.additional_security_group: additional_group_ids = [sg.id From 4dc6756844e0effeef36a62eefdd96bb63cb3db9 Mon Sep 17 00:00:00 2001 From: Mike Jennings Date: Mon, 15 Dec 2014 21:55:41 -0800 Subject: [PATCH 3/4] Remove duplicate comment --- docs/ec2-scripts.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md index f99d6b1a2bbf7..d50f445d7ecc7 100644 --- a/docs/ec2-scripts.md +++ b/docs/ec2-scripts.md @@ -103,8 +103,7 @@ permissions on your private key file, you can run `launch` with the key pair, `` is the number of slave nodes to launch (try 1 at first), `` is the name of your VPC, `` is the name of your subnet, and `` is the name to give to your - cluster, `` is the name of your VPC, and `` is the - name of your subnet. + cluster. For example: From be9cb43e48637d45b8a2936b2cdfb1971bf556a0 Mon Sep 17 00:00:00 2001 From: Mike Jennings Date: Tue, 16 Dec 2014 00:02:40 -0800 Subject: [PATCH 4/4] `pep8 spark_ec2.py` runs cleanly. --- ec2/spark_ec2.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a1e3bc3444284..92adfd2d07b5b 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -311,16 +311,22 @@ def launch_cluster(conn, opts, cluster_name): slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created - if opts.vpc_id == None: + if opts.vpc_id is None: master_group.authorize(src_group=master_group) master_group.authorize(src_group=slave_group) else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=master_group) - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=slave_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=master_group) + master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=master_group) + master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=master_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=slave_group) + master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=slave_group) + master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) @@ -332,16 +338,22 @@ def launch_cluster(conn, opts, cluster_name): if opts.ganglia: master_group.authorize('tcp', 5080, 5080, authorized_address) if slave_group.rules == []: # Group was just now created - if opts.vpc_id == None: + if opts.vpc_id is None: slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=slave_group) else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=master_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=master_group) - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, src_group=slave_group) + slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=master_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=master_group) + slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=master_group) + slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=slave_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=slave_group) + slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address)