From 7aa92d2bb663390a9bdce1c1a958af2f94f8edab Mon Sep 17 00:00:00 2001 From: Bill Prin Date: Fri, 29 Apr 2016 13:37:44 -0700 Subject: [PATCH] Add Dataproc Sample --- dataproc/README.md | 60 ++++++ dataproc/create_cluster.py | 0 dataproc/create_cluster_and_submit_job.py | 223 ++++++++++++++++++++++ dataproc/dataproc_e2e_test.py | 32 ++++ dataproc/list_clusters.py | 61 ++++++ dataproc/pyspark_sort.py | 26 +++ dataproc/requirements.txt | 2 + 7 files changed, 404 insertions(+) delete mode 100644 dataproc/create_cluster.py create mode 100644 dataproc/create_cluster_and_submit_job.py create mode 100644 dataproc/dataproc_e2e_test.py create mode 100644 dataproc/list_clusters.py create mode 100644 dataproc/pyspark_sort.py create mode 100644 dataproc/requirements.txt diff --git a/dataproc/README.md b/dataproc/README.md index e69de29bb2d1..5ff626b400be 100644 --- a/dataproc/README.md +++ b/dataproc/README.md @@ -0,0 +1,60 @@ +# Cloud Dataproc API Example + +Sample command-line programs for interacting with the Cloud Dataproc API. + +Note that while this sample demonstrates interacting with Dataproc via the API, the functionality +demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI. + +`list_clusters.py` is a simple command-line program to demonstrate connecting to the +Dataproc API and listing the clusters in a ergion + +`create_cluster_and_submit_jbo.py` demonstrates how to create a cluster, submit the +`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result. + +## Prerequisites to run locally: + +* [pip](https://pypi.python.org/pypi/pip) + +Go to the [Google Cloud Console](https://console.cloud.google.com). + +Under API Manager, search for the Google Cloud Dataproc API and enable it. + + +# Set Up Your Local Dev Environment +To install, run the following commands. If you want to use [virtualenv](https://virtualenv.readthedocs.org/en/latest/) +(recommended), run the commands within a virtualenv. + + * pip install -r requirements.txt + +Create local credentials by running the following command and following the oauth2 flow: + + gcloud beta auth application-default login + +To run list_clusters.py: + + python list_clusters.py --project_id= --zone=us-central1-b + + +To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with +gsutil: + + gsutil mb gs:// + +Then run: + + python create_cluster_and_submit_job.py --project_id= --zone=us-central1-b --cluster_name=testcluster --gcs_bucket= + +This will setup a cluster, upload the PySpark file, submit the job, print the result, then +delete the cluster. + +## Running on GCE, GAE, or other environments + +On Google App Engine, the credentials should be found automatically. + +On Google Compute Engine, the credentials should be found automatically, but require that +you create the instance with the correct scopes. + + gcloud compute instances create --scopes="https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/compute,https://www.googleapis.com/auth/compute.readonly" test-instance + +If you did not create the instance with the right scopes, you can still upload a JSON service +account and set GOOGLE_APPLICATION_CREDENTIALS as described below. diff --git a/dataproc/create_cluster.py b/dataproc/create_cluster.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/dataproc/create_cluster_and_submit_job.py b/dataproc/create_cluster_and_submit_job.py new file mode 100644 index 000000000000..a6e5af7567b2 --- /dev/null +++ b/dataproc/create_cluster_and_submit_job.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample command-line program for listing Google Dataproc Clusters""" + +import argparse +import os +from apiclient import discovery +from gcloud import storage +from oauth2client.client import GoogleCredentials + +# Currently only the "global" region is supported +REGION = "global" + +FILENAME = 'pyspark_sort.py' + + +def get_pyspark_file(): + """Gets the PySpark file from this directory""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + + return open(os.path.join(current_dir, FILENAME)) + + +def upload_pyspark_file(project_id, bucket_name): + """Uploads the PySpark file in this directory to the configured + input bucket.""" + + print("Uploading pyspark file to GCS") + client = storage.Client(project=project_id) + bucket = client.get_bucket(bucket_name) + blob = bucket.blob(FILENAME) + blob.upload_from_file(get_pyspark_file()) + + +def download_output(project_id, cluster_id, output_bucket, job_id): + """Downloads the output file from Cloud Storage and returns it as a + string.""" + print ("Downloading output file") + client = storage.Client(project=project_id) + bucket = client.get_bucket(output_bucket) + + OUTPUT_BLOB = ( + 'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000' + .format(cluster_id, job_id)) + return bucket.blob(OUTPUT_BLOB).download_as_string() + + +# [START create_cluster] +def create_cluster(dataproc, project, cluster_name, zone): + print ("Creating cluster.") + zone_uri = 'https://www.googleapis.com/compute/v1/projects/' + \ + project + '/zones/' + zone + cluster_data = { + 'projectId': project, + 'clusterName': cluster_name, + 'config': { + 'gceClusterConfig': { + 'zoneUri': zone_uri + } + } + } + result = dataproc.projects().regions().clusters().create( + projectId=project, + region=REGION, + body=cluster_data).execute() + return result +# [END create_cluster] + + +def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone): + print("Waiting for cluster creation") + + while True: + result = dataproc.projects().regions().clusters().list( + projectId=project_id, + region=REGION).execute() + cluster_list = result['clusters'] + cluster = [c + for c in cluster_list + if c['clusterName'] == cluster_name][0] + if cluster['status']['state'] == 'ERROR': + raise Exception(result['error']) + if cluster['status']['state'] == 'RUNNING': + print("Cluster created.") + break + + +# [START list_clusters_with_detail] +def list_clusters_with_details(dataproc, project): + result = dataproc.projects().regions().clusters().list( + projectId=project, + region=REGION).execute() + cluster_list = result['clusters'] + for cluster in cluster_list: + print("{} - {}" + .format(cluster['clusterName'], cluster['status']['state'])) + return result +# [END list_clusters_with_detail] + + +def get_cluster_id_by_name(cluster_list, cluster_name): + """Helper function to retrieve the ID and output bucket of a cluster by + name.""" + cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0] + return cluster['clusterUuid'], cluster['config']['configBucket'] + + +# [START submit_pyspark_job] +def submit_pyspark_job(dataproc, project, cluster_name, bucket_name): + """Submits the Pyspark job to the cluster, assuming `FILENAME` has + already been uploaded to `bucket_name`""" + job_details = { + "projectId": project, + "job": { + "placement": { + "clusterName": cluster_name + }, + + "pysparkJob": { + "mainPythonFileUri": 'gs://{}/{}'.format(bucket_name, FILENAME) + } + } + } + result = dataproc.projects().regions().jobs().submit( + projectId=project, + region=REGION, + body=job_details).execute() + jobId = result['reference']['jobId'] + print("Submitted job ID {}".format(jobId)) + return jobId +# [END submit_pyspark_job] + + +# [START delete] +def delete_cluster(dataproc, project, cluster): + print "Tearing down cluster" + result = dataproc.projects().regions().clusters().delete( + projectId=project, + region=REGION, + clusterName=cluster).execute() + return result +# [END delete] + + +# [START wait] +def wait_for_job(dataproc, project, job_id): + print('Waiting for job to finish...') + while True: + result = dataproc.projects().regions().jobs().get( + projectId=project, + region=REGION, + jobId=job_id).execute() + # Handle exceptions + if result['status']['state'] == 'ERROR': + raise Exception(result['error']) + elif result['status']['state'] == 'DONE': + print("Job finished") + return result +# [END wait] + + +# [START get_client] +def get_client(): + """Builds an http client authenticated with the service account + credentials.""" + credentials = GoogleCredentials.get_application_default() + dataproc = discovery.build('dataproc', 'v1', credentials=credentials) + return dataproc +# [END get_client] + + +def create_and_submit_main(project_id, zone, cluster_name, bucket_name): + dataproc = get_client() + try: + create_cluster(dataproc, project_id, cluster_name, zone) + wait_for_cluster_creation(dataproc, project_id, cluster_name, zone) + upload_pyspark_file(project_id, bucket_name) + cluster_list = list_clusters_with_details( + dataproc, project_id)['clusters'] + + (cluster_id, output_bucket) = ( + get_cluster_id_by_name(cluster_list, cluster_name)) + job_id = submit_pyspark_job( + dataproc, project_id, cluster_name, bucket_name) + wait_for_job(dataproc, project_id, job_id) + + output = download_output(project_id, cluster_id, output_bucket, job_id) + print("Received job output {} ".format(output)) + return output + finally: + delete_cluster(dataproc, project_id, cluster_name) + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument( + '--project_id', help='Project ID you want to access.', required=True), + parser.add_argument( + '--zone', help='Region to create clusters in', required=True) + parser.add_argument( + '--cluster_name', help='Region to create clusters in', required=True) + parser.add_argument( + '--gcs_bucket', help='Bucket to upload Pyspark file to', required=True) + + args = parser.parse_args() + create_and_submit_main( + args.project_id, args.zone, + args.cluster_name, args.gcs_bucket) diff --git a/dataproc/dataproc_e2e_test.py b/dataproc/dataproc_e2e_test.py new file mode 100644 index 000000000000..4c69c28bf9b6 --- /dev/null +++ b/dataproc/dataproc_e2e_test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Integration tests for Dataproc samples. + +Creates a Dataproc cluster, uploads a pyspark file to Google Cloud Storage, +submits a job to Dataproc that runs the pyspark file, then downloads +the output logs from Cloud Storage and verifies the expected output.""" + +from create_cluster_and_submit_job import create_and_submit_main +from gcp.testing.flaky import flaky + +CLUSTER_NAME = 'testcluster2' +PROJECT_ID = 'bill-stackdriver-experiment' +ZONE = 'us-central1-b' + + +# @flaky +def test_e2e(cloud_config): + output = create_and_submit_main( + cloud_config.project, ZONE, CLUSTER_NAME, cloud_config.storage_bucket) + assert "['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output diff --git a/dataproc/list_clusters.py b/dataproc/list_clusters.py new file mode 100644 index 000000000000..325369462fc2 --- /dev/null +++ b/dataproc/list_clusters.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample command-line program for listing Google Dataproc Clusters +""" + +import argparse + +from apiclient import discovery +from oauth2client.client import GoogleCredentials + +# Currently only the "global" region is supported +REGION = "global" + + +# [START list_clusters] +def list_clusters(dataproc, project): + result = dataproc.projects().regions().clusters().list( + projectId=project, + region=REGION).execute() + return result +# [END list_clusters] + + +# [START get_client] +def get_client(): + """Builds an http client authenticated with the service account + credentials.""" + credentials = GoogleCredentials.get_application_default() + dataproc = discovery.build('dataproc', 'v1', credentials=credentials) + return dataproc +# [END get_client] + + +def main(project_id, zone): + dataproc = get_client() + result = list_clusters(dataproc, project_id) + print(result) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument( + '--project_id', help='Project ID you want to access.', required=True), + parser.add_argument( + '--zone', help='Region to create clusters in', required=True) + + args = parser.parse_args() + main(args.project_id, args.zone) diff --git a/dataproc/pyspark_sort.py b/dataproc/pyspark_sort.py new file mode 100644 index 000000000000..7fface1db9f3 --- /dev/null +++ b/dataproc/pyspark_sort.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample pyspark script to be uploaded to Cloud Storage and run on +Cloud Dataproc. + +Note this file is not intended to be run directly, but run inside a PySpark +environment. +""" + +import pyspark + +sc = pyspark.SparkContext() +rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther']) +words = sorted(rdd.collect()) +print words diff --git a/dataproc/requirements.txt b/dataproc/requirements.txt new file mode 100644 index 000000000000..50278dbe8067 --- /dev/null +++ b/dataproc/requirements.txt @@ -0,0 +1,2 @@ +google-api-python-client==1.5.0 +gcloud==0.13.0