From c5ceda308225e90e82d5d3a94685ad30e749069c Mon Sep 17 00:00:00 2001 From: Vladimir Grigor Date: Wed, 14 Jan 2015 08:57:59 +0200 Subject: [PATCH 1/2] SPARK-5242 --- ec2/spark_ec2.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index abab209a05ba..8f01c141b936 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -612,12 +612,30 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): sys.exit(1) +def get_connection_address(instance): + """ + Return DNS name if possible, otherwise return IP. + It is common that VPC instances does not have either public DNS or public IP or private DNS. + But private IP is always there. + + :type instance: :class:`ec2.instance.Instance` + :return: string first not None of: public_dns_name, public ip_address, private_dns_name, finally private_ip + """ + + address = instance.public_dns_name or instance.ip_address or instance.private_dns_name or instance.private_ip_address + + if address is None: + raise RuntimeError('Cannot get address from instance %s' % instance) + + return address + + # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = master_nodes[0].public_dns_name + master = get_connection_address(master_nodes[0]) if deploy_ssh_key: print "Generating cluster's SSH key on master..." key_setup = """ @@ -629,8 +647,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print "Transferring cluster's SSH key to slaves..." for slave in slave_nodes: - print slave.public_dns_name - ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) + print get_connection_address(slave) + ssh_write(get_connection_address(slave), opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon'] @@ -697,7 +715,7 @@ def is_cluster_ssh_available(cluster_instances, opts): Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: - if not is_ssh_available(host=i.ip_address, opts=opts): + if not is_ssh_available(host=get_connection_address(i), opts=opts): return False else: return True @@ -811,7 +829,7 @@ def get_num_disks(instance_type): # # root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = master_nodes[0].public_dns_name + active_master = get_connection_address(master_nodes[0]) num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -833,9 +851,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version) template_vars = { - "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), + "master_list": '\n'.join([get_connection_address(i) for i in master_nodes]), "active_master": active_master, - "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]), + "slave_list": '\n'.join([get_connection_address(i) for i in slave_nodes]), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, @@ -1039,7 +1057,7 @@ def real_main(): (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) for inst in master_nodes + slave_nodes: - print "> %s" % inst.public_dns_name + print "> %s" % get_connection_address(inst) msg = "ALL DATA ON ALL NODES WILL BE LOST!!\nDestroy cluster %s (y/N): " % cluster_name response = raw_input(msg) @@ -1101,7 +1119,7 @@ def real_main(): elif action == "login": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - master = master_nodes[0].public_dns_name + master = get_connection_address(master_nodes[0]) print "Logging into master " + master + "..." proxy_opt = [] if opts.proxy_port is not None: @@ -1125,7 +1143,7 @@ def real_main(): elif action == "get-master": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print master_nodes[0].public_dns_name + print get_connection_address(master_nodes[0]) elif action == "stop": response = raw_input( From c3960627c3409575c2772e62b46109fef4f28a52 Mon Sep 17 00:00:00 2001 From: Vladimir Grigor Date: Wed, 14 Jan 2015 14:57:27 +0200 Subject: [PATCH 2/2] fix for SPARK-5242. Bug bug fix commit: making sure address used resolves --- ec2/spark_ec2.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 8f01c141b936..112a6631f440 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -614,20 +614,28 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): def get_connection_address(instance): """ - Return DNS name if possible, otherwise return IP. - It is common that VPC instances does not have either public DNS or public IP or private DNS. + Return some address that can be used to connect to instance. + It is common that VPC instance does not have either public DNS or public IP or private DNS. + Private DNS name might be problematic to use if you're VPN does not resolve private DNS. But private IP is always there. + TODO: maybe add static cache of instance-id to resolved address + :type instance: :class:`ec2.instance.Instance` - :return: string first not None of: public_dns_name, public ip_address, private_dns_name, finally private_ip + :return: string """ + import socket + addresses = [instance.public_dns_name, instance.ip_address, instance.private_dns_name] + for address in addresses: + if address: + try: + # try to resolve address + socket.gethostbyaddr(address) + return address + except (socket.gaierror, socket.herror): + pass - address = instance.public_dns_name or instance.ip_address or instance.private_dns_name or instance.private_ip_address - - if address is None: - raise RuntimeError('Cannot get address from instance %s' % instance) - - return address + return instance.private_ip_address # Deploy configuration files and run setup scripts on a newly launched @@ -647,8 +655,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print "Transferring cluster's SSH key to slaves..." for slave in slave_nodes: - print get_connection_address(slave) - ssh_write(get_connection_address(slave), opts, ['tar', 'x'], dot_ssh_tar) + slave_address = get_connection_address(slave) + print slave_address + ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon']