diff --git a/dataproc/README.md b/dataproc/README.md index 6f0a0390e2a9..6bf819ccfa55 100644 --- a/dataproc/README.md +++ b/dataproc/README.md @@ -35,14 +35,19 @@ To run list_clusters.py: python list_clusters.py --region=us-central1 -To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with +To run submit_job_to_cluster.py, first create a GCS bucket, from the Cloud Console or with gsutil: gsutil mb gs:// -Then run: +Then, if you want to rely on an existing cluster, run: - python create_cluster_and_submit_job.py --project_id= --zone=us-central1-b --cluster_name=testcluster --gcs_bucket= + python submit_job_to_cluster.py --project_id= --zone=us-central1-b --cluster_name=testcluster --gcs_bucket= + +Otherwise, if you want the script to create a new cluster for you: + + python submit_job_to_cluster.py --project_id= --zone=us-central1-b --cluster_name=testcluster --gcs_bucket= --create_new_cluster + This will setup a cluster, upload the PySpark file, submit the job, print the result, then delete the cluster. diff --git a/dataproc/dataproc_e2e_test.py b/dataproc/dataproc_e2e_test.py index dcb836c306bb..d7e9c522074e 100644 --- a/dataproc/dataproc_e2e_test.py +++ b/dataproc/dataproc_e2e_test.py @@ -20,7 +20,7 @@ from gcp_devrel.testing.flaky import flaky -import create_cluster_and_submit_job +import submit_job_to_cluster PROJECT = os.environ['GCLOUD_PROJECT'] BUCKET = os.environ['CLOUD_STORAGE_BUCKET'] @@ -30,6 +30,6 @@ @flaky def test_e2e(): - output = create_cluster_and_submit_job.main( + output = submit_job_to_cluster.main( PROJECT, ZONE, CLUSTER_NAME, BUCKET) assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output diff --git a/dataproc/create_cluster_and_submit_job.py b/dataproc/submit_job_to_cluster.py similarity index 76% rename from dataproc/create_cluster_and_submit_job.py rename to dataproc/submit_job_to_cluster.py index f165e145b67d..b1230020415e 100644 --- a/dataproc/create_cluster_and_submit_job.py +++ b/dataproc/submit_job_to_cluster.py @@ -19,8 +19,6 @@ from google.cloud import storage import googleapiclient.discovery -# Currently only the "global" region is supported -REGION = 'global' DEFAULT_FILENAME = 'pyspark_sort.py' @@ -36,6 +34,14 @@ def get_pyspark_file(filename): return f, os.path.basename(filename) +def get_region_from_zone(zone): + try: + region_as_list = zone.split('-')[:-1] + return '-'.join(region_as_list) + except (AttributeError, IndexError, ValueError): + raise ValueError('Invalid zone provided, please check your input.') + + def upload_pyspark_file(project_id, bucket_name, filename, file): """Uploads the PySpark file in this directory to the configured input bucket.""" @@ -59,8 +65,8 @@ def download_output(project_id, cluster_id, output_bucket, job_id): # [START create_cluster] -def create_cluster(dataproc, project, cluster_name, zone): - print('Creating cluster.') +def create_cluster(dataproc, project, zone, region, cluster_name): + print('Creating cluster...') zone_uri = \ 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( project, zone) @@ -75,19 +81,19 @@ def create_cluster(dataproc, project, cluster_name, zone): } result = dataproc.projects().regions().clusters().create( projectId=project, - region=REGION, + region=region, body=cluster_data).execute() return result # [END create_cluster] -def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone): - print('Waiting for cluster creation') +def wait_for_cluster_creation(dataproc, project_id, region, cluster_name): + print('Waiting for cluster creation...') while True: result = dataproc.projects().regions().clusters().list( projectId=project_id, - region=REGION).execute() + region=region).execute() cluster_list = result['clusters'] cluster = [c for c in cluster_list @@ -100,10 +106,10 @@ def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone): # [START list_clusters_with_detail] -def list_clusters_with_details(dataproc, project): +def list_clusters_with_details(dataproc, project, region): result = dataproc.projects().regions().clusters().list( projectId=project, - region=REGION).execute() + region=region).execute() cluster_list = result['clusters'] for cluster in cluster_list: print("{} - {}" @@ -120,7 +126,7 @@ def get_cluster_id_by_name(cluster_list, cluster_name): # [START submit_pyspark_job] -def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename): +def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name, filename): """Submits the Pyspark job to the cluster, assuming `filename` has already been uploaded to `bucket_name`""" job_details = { @@ -136,7 +142,7 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename): } result = dataproc.projects().regions().jobs().submit( projectId=project, - region=REGION, + region=region, body=job_details).execute() job_id = result['reference']['jobId'] print('Submitted job ID {}'.format(job_id)) @@ -145,29 +151,29 @@ def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename): # [START delete] -def delete_cluster(dataproc, project, cluster): +def delete_cluster(dataproc, project, region, cluster): print('Tearing down cluster') result = dataproc.projects().regions().clusters().delete( projectId=project, - region=REGION, + region=region, clusterName=cluster).execute() return result # [END delete] # [START wait] -def wait_for_job(dataproc, project, job_id): +def wait_for_job(dataproc, project, region, job_id): print('Waiting for job to finish...') while True: result = dataproc.projects().regions().jobs().get( projectId=project, - region=REGION, + region=region, jobId=job_id).execute() # Handle exceptions if result['status']['state'] == 'ERROR': raise Exception(result['status']['details']) elif result['status']['state'] == 'DONE': - print('Job finished') + print('Job finished.') return result # [END wait] @@ -181,34 +187,40 @@ def get_client(): # [END get_client] -def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None): +def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None, create_new_cluster=True): dataproc = get_client() + region = get_region_from_zone(zone) try: if pyspark_file: spark_file, spark_filename = get_pyspark_file(pyspark_file) else: spark_file, spark_filename = get_default_pyspark_file() - create_cluster(dataproc, project_id, cluster_name, zone) - wait_for_cluster_creation(dataproc, project_id, cluster_name, zone) - upload_pyspark_file(project_id, bucket_name, - spark_filename, spark_file) + if create_new_cluster: + create_cluster(dataproc, project_id, zone, region, cluster_name) + wait_for_cluster_creation(dataproc, project_id, region, cluster_name) + + upload_pyspark_file( + project_id, bucket_name, spark_filename, spark_file) + cluster_list = list_clusters_with_details( - dataproc, project_id)['clusters'] + dataproc, project_id, region)['clusters'] (cluster_id, output_bucket) = ( get_cluster_id_by_name(cluster_list, cluster_name)) + # [START call_submit_pyspark_job] job_id = submit_pyspark_job( - dataproc, project_id, cluster_name, bucket_name, spark_filename) + dataproc, project_id, region, cluster_name, bucket_name, spark_filename) # [END call_submit_pyspark_job] - wait_for_job(dataproc, project_id, job_id) + wait_for_job(dataproc, project_id, region, job_id) output = download_output(project_id, cluster_id, output_bucket, job_id) print('Received job output {}'.format(output)) return output finally: - delete_cluster(dataproc, project_id, cluster_name) + if create_new_cluster: + delete_cluster(dataproc, project_id, region, cluster_name) spark_file.close() @@ -220,15 +232,17 @@ def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None): parser.add_argument( '--project_id', help='Project ID you want to access.', required=True), parser.add_argument( - '--zone', help='Region to create clusters in', required=True) + '--zone', help='Zone to create clusters in/connect to', required=True) parser.add_argument( - '--cluster_name', help='Name of the cluster to create', required=True) + '--cluster_name', help='Name of the cluster to create/connect to', required=True) parser.add_argument( '--gcs_bucket', help='Bucket to upload Pyspark file to', required=True) parser.add_argument( '--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py') + parser.add_argument( + '--create_new_cluster', action='store_true', help='States whether the cluster should be created or not') args = parser.parse_args() main( - args.project_id, args.zone, - args.cluster_name, args.gcs_bucket, args.pyspark_file) + args.project_id, args.zone, args.cluster_name, + args.gcs_bucket, args.pyspark_file, args.create_new_cluster)