Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ def parse_args():
parser.add_option(
"--vpc-id", default=None,
help="VPC to launch instances in")
parser.add_option(
"--private-ips", action="store_true", default=False,
help="Use private IPs for instances rather than public if VPC/subnet " +
"requires that.")

(opts, args) = parser.parse_args()
if len(args) != 2:
Expand Down Expand Up @@ -707,7 +711,7 @@ def get_instances(group_names):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
master = get_dns_name(master_nodes[0], opts.private_ips)
if deploy_ssh_key:
print "Generating cluster's SSH key on master..."
key_setup = """
Expand All @@ -719,8 +723,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
print "Transferring cluster's SSH key to slaves..."
for slave in slave_nodes:
print slave.public_dns_name
ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
slave_address = get_dns_name(slave, opts.private_ips)
print slave_address
ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)

modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone', 'tachyon']
Expand Down Expand Up @@ -809,7 +814,8 @@ def is_cluster_ssh_available(cluster_instances, opts):
Check if SSH is available on all the instances in a cluster.
"""
for i in cluster_instances:
if not is_ssh_available(host=i.public_dns_name, opts=opts):
dns_name = get_dns_name(i, opts.private_ips)
if not is_ssh_available(host=dns_name, opts=opts):
return False
else:
return True
Expand Down Expand Up @@ -923,7 +929,7 @@ def get_num_disks(instance_type):
#
# root_dir should be an absolute path to the directory with the files we want to deploy.
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
active_master = master_nodes[0].public_dns_name
active_master = get_dns_name(master_nodes[0], opts.private_ips)

num_disks = get_num_disks(opts.instance_type)
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
Expand All @@ -948,10 +954,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
print "Deploying Spark via git hash; Tachyon won't be set up"
modules = filter(lambda x: x != "tachyon", modules)

master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"master_list": '\n'.join(master_addresses),
"active_master": active_master,
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
"slave_list": '\n'.join(slave_addresses),
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
Expand Down Expand Up @@ -1011,7 +1019,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
#
# root_dir should be an absolute path.
def deploy_user_files(root_dir, opts, master_nodes):
active_master = master_nodes[0].public_dns_name
active_master = get_dns_name(master_nodes[0], opts.private_ips)
command = [
'rsync', '-rv',
'-e', stringify_command(ssh_command(opts)),
Expand Down Expand Up @@ -1122,6 +1130,20 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone


# Gets the IP address, taking into account the --private-ips flag
def get_ip_address(instance, private_ips=False):
ip = instance.ip_address if not private_ips else \
instance.private_ip_address
return ip


# Gets the DNS name, taking into account the --private-ips flag
def get_dns_name(instance, private_ips=False):
dns = instance.public_dns_name if not private_ips else \
instance.private_ip_address
return dns


def real_main():
(opts, action, cluster_name) = parse_args()

Expand Down Expand Up @@ -1230,7 +1252,7 @@ def real_main():
if any(master_nodes + slave_nodes):
print "The following instances will be terminated:"
for inst in master_nodes + slave_nodes:
print "> %s" % inst.public_dns_name
print "> %s" % get_dns_name(inst, opts.private_ips)
print "ALL DATA ON ALL NODES WILL BE LOST!!"

msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
Expand Down Expand Up @@ -1294,13 +1316,17 @@ def real_main():

elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
proxy_opt = []
if opts.proxy_port is not None:
proxy_opt = ['-D', opts.proxy_port]
subprocess.check_call(
ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
if not master_nodes[0].public_dns_name and not opts.private_ips:
print "Master has no public DNS name. Maybe you meant to specify " \
"--private-ips?"
else:
master = get_dns_name(master_nodes[0], opts.private_ips)
print "Logging into master " + master + "..."
proxy_opt = []
if opts.proxy_port is not None:
proxy_opt = ['-D', opts.proxy_port]
subprocess.check_call(
ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])

elif action == "reboot-slaves":
response = raw_input(
Expand All @@ -1318,7 +1344,11 @@ def real_main():

elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print master_nodes[0].public_dns_name
if not master_nodes[0].public_dns_name and not opts.private_ips:
print "Master has no public DNS name. Maybe you meant to specify " \
"--private-ips?"
else:
print get_dns_name(master_nodes[0], opts.private_ips)

elif action == "stop":
response = raw_input(
Expand Down