Skip to content

Commit

Permalink
Add Dataproc Sample
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill Prin committed Apr 29, 2016
1 parent 4258361 commit 7aa92d2
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 0 deletions.
60 changes: 60 additions & 0 deletions dataproc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Cloud Dataproc API Example

Sample command-line programs for interacting with the Cloud Dataproc API.

Note that while this sample demonstrates interacting with Dataproc via the API, the functionality
demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.

`list_clusters.py` is a simple command-line program to demonstrate connecting to the
Dataproc API and listing the clusters in a ergion

`create_cluster_and_submit_jbo.py` demonstrates how to create a cluster, submit the
`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result.

## Prerequisites to run locally:

* [pip](https://pypi.python.org/pypi/pip)

Go to the [Google Cloud Console](https://console.cloud.google.com).

Under API Manager, search for the Google Cloud Dataproc API and enable it.


# Set Up Your Local Dev Environment
To install, run the following commands. If you want to use [virtualenv](https://virtualenv.readthedocs.org/en/latest/)
(recommended), run the commands within a virtualenv.

* pip install -r requirements.txt

Create local credentials by running the following command and following the oauth2 flow:

gcloud beta auth application-default login

To run list_clusters.py:

python list_clusters.py --project_id=<YOUR-PROJECT-ID> --zone=us-central1-b


To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with
gsutil:

gsutil mb gs://<your-input-bucket-name>

Then run:

python create_cluster_and_submit_job.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>

This will setup a cluster, upload the PySpark file, submit the job, print the result, then
delete the cluster.

## Running on GCE, GAE, or other environments

On Google App Engine, the credentials should be found automatically.

On Google Compute Engine, the credentials should be found automatically, but require that
you create the instance with the correct scopes.

gcloud compute instances create --scopes="https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/compute,https://www.googleapis.com/auth/compute.readonly" test-instance

If you did not create the instance with the right scopes, you can still upload a JSON service
account and set GOOGLE_APPLICATION_CREDENTIALS as described below.
Empty file removed dataproc/create_cluster.py
Empty file.
223 changes: 223 additions & 0 deletions dataproc/create_cluster_and_submit_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Sample command-line program for listing Google Dataproc Clusters"""

import argparse
import os
from apiclient import discovery
from gcloud import storage
from oauth2client.client import GoogleCredentials

# Currently only the "global" region is supported
REGION = "global"

FILENAME = 'pyspark_sort.py'


def get_pyspark_file():
"""Gets the PySpark file from this directory"""
current_dir = os.path.dirname(os.path.abspath(__file__))

return open(os.path.join(current_dir, FILENAME))


def upload_pyspark_file(project_id, bucket_name):
"""Uploads the PySpark file in this directory to the configured
input bucket."""

print("Uploading pyspark file to GCS")
client = storage.Client(project=project_id)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(FILENAME)
blob.upload_from_file(get_pyspark_file())


def download_output(project_id, cluster_id, output_bucket, job_id):
"""Downloads the output file from Cloud Storage and returns it as a
string."""
print ("Downloading output file")
client = storage.Client(project=project_id)
bucket = client.get_bucket(output_bucket)

OUTPUT_BLOB = (
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
.format(cluster_id, job_id))
return bucket.blob(OUTPUT_BLOB).download_as_string()


# [START create_cluster]
def create_cluster(dataproc, project, cluster_name, zone):
print ("Creating cluster.")
zone_uri = 'https://www.googleapis.com/compute/v1/projects/' + \
project + '/zones/' + zone
cluster_data = {
'projectId': project,
'clusterName': cluster_name,
'config': {
'gceClusterConfig': {
'zoneUri': zone_uri
}
}
}
result = dataproc.projects().regions().clusters().create(
projectId=project,
region=REGION,
body=cluster_data).execute()
return result
# [END create_cluster]


def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
print("Waiting for cluster creation")

while True:
result = dataproc.projects().regions().clusters().list(
projectId=project_id,
region=REGION).execute()
cluster_list = result['clusters']
cluster = [c
for c in cluster_list
if c['clusterName'] == cluster_name][0]
if cluster['status']['state'] == 'ERROR':
raise Exception(result['error'])
if cluster['status']['state'] == 'RUNNING':
print("Cluster created.")
break


# [START list_clusters_with_detail]
def list_clusters_with_details(dataproc, project):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=REGION).execute()
cluster_list = result['clusters']
for cluster in cluster_list:
print("{} - {}"
.format(cluster['clusterName'], cluster['status']['state']))
return result
# [END list_clusters_with_detail]


def get_cluster_id_by_name(cluster_list, cluster_name):
"""Helper function to retrieve the ID and output bucket of a cluster by
name."""
cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0]
return cluster['clusterUuid'], cluster['config']['configBucket']


# [START submit_pyspark_job]
def submit_pyspark_job(dataproc, project, cluster_name, bucket_name):
"""Submits the Pyspark job to the cluster, assuming `FILENAME` has
already been uploaded to `bucket_name`"""
job_details = {
"projectId": project,
"job": {
"placement": {
"clusterName": cluster_name
},

"pysparkJob": {
"mainPythonFileUri": 'gs://{}/{}'.format(bucket_name, FILENAME)
}
}
}
result = dataproc.projects().regions().jobs().submit(
projectId=project,
region=REGION,
body=job_details).execute()
jobId = result['reference']['jobId']
print("Submitted job ID {}".format(jobId))
return jobId
# [END submit_pyspark_job]


# [START delete]
def delete_cluster(dataproc, project, cluster):
print "Tearing down cluster"
result = dataproc.projects().regions().clusters().delete(
projectId=project,
region=REGION,
clusterName=cluster).execute()
return result
# [END delete]


# [START wait]
def wait_for_job(dataproc, project, job_id):
print('Waiting for job to finish...')
while True:
result = dataproc.projects().regions().jobs().get(
projectId=project,
region=REGION,
jobId=job_id).execute()
# Handle exceptions
if result['status']['state'] == 'ERROR':
raise Exception(result['error'])
elif result['status']['state'] == 'DONE':
print("Job finished")
return result
# [END wait]


# [START get_client]
def get_client():
"""Builds an http client authenticated with the service account
credentials."""
credentials = GoogleCredentials.get_application_default()
dataproc = discovery.build('dataproc', 'v1', credentials=credentials)
return dataproc
# [END get_client]


def create_and_submit_main(project_id, zone, cluster_name, bucket_name):
dataproc = get_client()
try:
create_cluster(dataproc, project_id, cluster_name, zone)
wait_for_cluster_creation(dataproc, project_id, cluster_name, zone)
upload_pyspark_file(project_id, bucket_name)
cluster_list = list_clusters_with_details(
dataproc, project_id)['clusters']

(cluster_id, output_bucket) = (
get_cluster_id_by_name(cluster_list, cluster_name))
job_id = submit_pyspark_job(
dataproc, project_id, cluster_name, bucket_name)
wait_for_job(dataproc, project_id, job_id)

output = download_output(project_id, cluster_id, output_bucket, job_id)
print("Received job output {} ".format(output))
return output
finally:
delete_cluster(dataproc, project_id, cluster_name)



if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--project_id', help='Project ID you want to access.', required=True),
parser.add_argument(
'--zone', help='Region to create clusters in', required=True)
parser.add_argument(
'--cluster_name', help='Region to create clusters in', required=True)
parser.add_argument(
'--gcs_bucket', help='Bucket to upload Pyspark file to', required=True)

args = parser.parse_args()
create_and_submit_main(
args.project_id, args.zone,
args.cluster_name, args.gcs_bucket)
32 changes: 32 additions & 0 deletions dataproc/dataproc_e2e_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Integration tests for Dataproc samples.
Creates a Dataproc cluster, uploads a pyspark file to Google Cloud Storage,
submits a job to Dataproc that runs the pyspark file, then downloads
the output logs from Cloud Storage and verifies the expected output."""

from create_cluster_and_submit_job import create_and_submit_main
from gcp.testing.flaky import flaky

CLUSTER_NAME = 'testcluster2'
PROJECT_ID = 'bill-stackdriver-experiment'
ZONE = 'us-central1-b'


# @flaky
def test_e2e(cloud_config):
output = create_and_submit_main(
cloud_config.project, ZONE, CLUSTER_NAME, cloud_config.storage_bucket)
assert "['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output
61 changes: 61 additions & 0 deletions dataproc/list_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Sample command-line program for listing Google Dataproc Clusters
"""

import argparse

from apiclient import discovery
from oauth2client.client import GoogleCredentials

# Currently only the "global" region is supported
REGION = "global"


# [START list_clusters]
def list_clusters(dataproc, project):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=REGION).execute()
return result
# [END list_clusters]


# [START get_client]
def get_client():
"""Builds an http client authenticated with the service account
credentials."""
credentials = GoogleCredentials.get_application_default()
dataproc = discovery.build('dataproc', 'v1', credentials=credentials)
return dataproc
# [END get_client]


def main(project_id, zone):
dataproc = get_client()
result = list_clusters(dataproc, project_id)
print(result)

if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--project_id', help='Project ID you want to access.', required=True),
parser.add_argument(
'--zone', help='Region to create clusters in', required=True)

args = parser.parse_args()
main(args.project_id, args.zone)
Loading

0 comments on commit 7aa92d2

Please sign in to comment.