Skip to content

Commit

Permalink
Merge pull request #314 from GoogleCloudPlatform/dataproc
Browse files Browse the repository at this point in the history
Add Dataproc Sample
  • Loading branch information
waprin committed Apr 29, 2016
2 parents 4258361 + 03ba7a1 commit c2bcd20
Show file tree
Hide file tree
Showing 7 changed files with 419 additions and 0 deletions.
63 changes: 63 additions & 0 deletions dataproc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Cloud Dataproc API Example

Sample command-line programs for interacting with the Cloud Dataproc API.

Note that while this sample demonstrates interacting with Dataproc via the API, the functionality
demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.

`list_clusters.py` is a simple command-line program to demonstrate connecting to the
Dataproc API and listing the clusters in a ergion

`create_cluster_and_submit_jbo.py` demonstrates how to create a cluster, submit the
`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result.

## Prerequisites to run locally:

* [pip](https://pypi.python.org/pypi/pip)

Go to the [Google Cloud Console](https://console.cloud.google.com).

Under API Manager, search for the Google Cloud Dataproc API and enable it.


# Set Up Your Local Dev Environment
To install, run the following commands. If you want to use [virtualenv](https://virtualenv.readthedocs.org/en/latest/)
(recommended), run the commands within a virtualenv.

* pip install -r requirements.txt

Create local credentials by running the following command and following the oauth2 flow:

gcloud beta auth application-default login

To run list_clusters.py:

python list_clusters.py --project_id=<YOUR-PROJECT-ID> --zone=us-central1-b


To run create_cluster_and_submit_job, first create a GCS bucket, from the Cloud Console or with
gsutil:

gsutil mb gs://<your-input-bucket-name>

Then run:

python create_cluster_and_submit_job.py --project_id=<your-project-id> --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=<your-input-bucket-name>

This will setup a cluster, upload the PySpark file, submit the job, print the result, then
delete the cluster.

You can optionally specify a `--pyspark_file` argument to change from the default
`pyspark_sort.py` included in this script to a new script.

## Running on GCE, GAE, or other environments

On Google App Engine, the credentials should be found automatically.

On Google Compute Engine, the credentials should be found automatically, but require that
you create the instance with the correct scopes.

gcloud compute instances create --scopes="https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/compute,https://www.googleapis.com/auth/compute.readonly" test-instance

If you did not create the instance with the right scopes, you can still upload a JSON service
account and set GOOGLE_APPLICATION_CREDENTIALS as described below.
Empty file removed dataproc/create_cluster.py
Empty file.
235 changes: 235 additions & 0 deletions dataproc/create_cluster_and_submit_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Sample command-line program for listing Google Dataproc Clusters"""

import argparse
import os

from apiclient import discovery
from gcloud import storage
from oauth2client.client import GoogleCredentials

# Currently only the "global" region is supported
REGION = 'global'
DEFAULT_FILENAME = 'pyspark_sort.py'


def get_default_pyspark_file():
"""Gets the PySpark file from this directory"""
current_dir = os.path.dirname(os.path.abspath(__file__))
f = open(os.path.join(current_dir, DEFAULT_FILENAME), 'r')
return f, DEFAULT_FILENAME


def get_pyspark_file(filename):
f = open(filename, 'r')
return f, os.path.basename(filename)


def upload_pyspark_file(project_id, bucket_name, filename, file):
"""Uploads the PySpark file in this directory to the configured
input bucket."""
print('Uploading pyspark file to GCS')
client = storage.Client(project=project_id)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(filename)
blob.upload_from_file(file)


def download_output(project_id, cluster_id, output_bucket, job_id):
"""Downloads the output file from Cloud Storage and returns it as a
string."""
print('Downloading output file')
client = storage.Client(project=project_id)
bucket = client.get_bucket(output_bucket)
output_blob = (
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
.format(cluster_id, job_id))
return bucket.blob(output_blob).download_as_string()


# [START create_cluster]
def create_cluster(dataproc, project, cluster_name, zone):
print('Creating cluster.')
zone_uri = \
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
project, zone)
cluster_data = {
'projectId': project,
'clusterName': cluster_name,
'config': {
'gceClusterConfig': {
'zoneUri': zone_uri
}
}
}
result = dataproc.projects().regions().clusters().create(
projectId=project,
region=REGION,
body=cluster_data).execute()
return result
# [END create_cluster]


def wait_for_cluster_creation(dataproc, project_id, cluster_name, zone):
print('Waiting for cluster creation')

while True:
result = dataproc.projects().regions().clusters().list(
projectId=project_id,
region=REGION).execute()
cluster_list = result['clusters']
cluster = [c
for c in cluster_list
if c['clusterName'] == cluster_name][0]
if cluster['status']['state'] == 'ERROR':
raise Exception(result['status']['details'])
if cluster['status']['state'] == 'RUNNING':
print("Cluster created.")
break


# [START list_clusters_with_detail]
def list_clusters_with_details(dataproc, project):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=REGION).execute()
cluster_list = result['clusters']
for cluster in cluster_list:
print("{} - {}"
.format(cluster['clusterName'], cluster['status']['state']))
return result
# [END list_clusters_with_detail]


def get_cluster_id_by_name(cluster_list, cluster_name):
"""Helper function to retrieve the ID and output bucket of a cluster by
name."""
cluster = [c for c in cluster_list if c['clusterName'] == cluster_name][0]
return cluster['clusterUuid'], cluster['config']['configBucket']


# [START submit_pyspark_job]
def submit_pyspark_job(dataproc, project, cluster_name, bucket_name, filename):
"""Submits the Pyspark job to the cluster, assuming `filename` has
already been uploaded to `bucket_name`"""
job_details = {
'projectId': project,
'job': {
'placement': {
'clusterName': cluster_name
},
'pysparkJob': {
'mainPythonFileUri': 'gs://{}/{}'.format(bucket_name, filename)
}
}
}
result = dataproc.projects().regions().jobs().submit(
projectId=project,
region=REGION,
body=job_details).execute()
job_id = result['reference']['jobId']
print('Submitted job ID {}'.format(job_id))
return job_id
# [END submit_pyspark_job]


# [START delete]
def delete_cluster(dataproc, project, cluster):
print('Tearing down cluster')
result = dataproc.projects().regions().clusters().delete(
projectId=project,
region=REGION,
clusterName=cluster).execute()
return result
# [END delete]


# [START wait]
def wait_for_job(dataproc, project, job_id):
print('Waiting for job to finish...')
while True:
result = dataproc.projects().regions().jobs().get(
projectId=project,
region=REGION,
jobId=job_id).execute()
# Handle exceptions
if result['status']['state'] == 'ERROR':
print(result)
raise Exception(result['status']['details'])
elif result['status']['state'] == 'DONE':
print('Job finished')
return result
# [END wait]


# [START get_client]
def get_client():
"""Builds an http client authenticated with the service account
credentials."""
credentials = GoogleCredentials.get_application_default()
dataproc = discovery.build('dataproc', 'v1', credentials=credentials)
return dataproc
# [END get_client]


def main(project_id, zone, cluster_name, bucket_name, pyspark_file=None):
dataproc = get_client()
try:
if pyspark_file:
spark_file, spark_filename = get_pyspark_file(pyspark_file)
else:
spark_file, spark_filename = get_default_pyspark_file()

create_cluster(dataproc, project_id, cluster_name, zone)
wait_for_cluster_creation(dataproc, project_id, cluster_name, zone)
upload_pyspark_file(project_id, bucket_name,
spark_filename, spark_file)
cluster_list = list_clusters_with_details(
dataproc, project_id)['clusters']

(cluster_id, output_bucket) = (
get_cluster_id_by_name(cluster_list, cluster_name))
job_id = submit_pyspark_job(
dataproc, project_id, cluster_name, bucket_name, spark_filename)
wait_for_job(dataproc, project_id, job_id)

output = download_output(project_id, cluster_id, output_bucket, job_id)
print('Received job output {}'.format(output))
return output
finally:
delete_cluster(dataproc, project_id, cluster_name)
spark_file.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--project_id', help='Project ID you want to access.', required=True),
parser.add_argument(
'--zone', help='Region to create clusters in', required=True)
parser.add_argument(
'--cluster_name', help='Region to create clusters in', required=True)
parser.add_argument(
'--gcs_bucket', help='Bucket to upload Pyspark file to', required=True)
parser.add_argument(
'--pyspark_file', help='Pyspark filename. Defaults to pyspark_sort.py')

args = parser.parse_args()
main(
args.project_id, args.zone,
args.cluster_name, args.gcs_bucket, args.pyspark_file)
30 changes: 30 additions & 0 deletions dataproc/dataproc_e2e_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Integration tests for Dataproc samples.
Creates a Dataproc cluster, uploads a pyspark file to Google Cloud Storage,
submits a job to Dataproc that runs the pyspark file, then downloads
the output logs from Cloud Storage and verifies the expected output."""

import create_cluster_and_submit_job
from gcp.testing.flaky import flaky

CLUSTER_NAME = 'testcluster2'
ZONE = 'us-central1-b'


@flaky
def test_e2e(cloud_config):
output = create_cluster_and_submit_job.main(
cloud_config.project, ZONE, CLUSTER_NAME, cloud_config.storage_bucket)
assert "['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output
61 changes: 61 additions & 0 deletions dataproc/list_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" Sample command-line program for listing Google Dataproc Clusters
"""

import argparse

from apiclient import discovery
from oauth2client.client import GoogleCredentials

# Currently only the "global" region is supported
REGION = 'global'


# [START list_clusters]
def list_clusters(dataproc, project):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=REGION).execute()
return result
# [END list_clusters]


# [START get_client]
def get_client():
"""Builds an http client authenticated with the service account
credentials."""
credentials = GoogleCredentials.get_application_default()
dataproc = discovery.build('dataproc', 'v1', credentials=credentials)
return dataproc
# [END get_client]


def main(project_id, zone):
dataproc = get_client()
result = list_clusters(dataproc, project_id)
print(result)

if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'project_id', help='Project ID you want to access.'),
parser.add_argument(
'zone', help='Region to create clusters in')

args = parser.parse_args()
main(args.project_id, args.zone)
Loading

0 comments on commit c2bcd20

Please sign in to comment.