Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
77e262f
add initial files
anguillanneuf Sep 2, 2021
c789aaa
add key, event time, data but not attributes
anguillanneuf Sep 3, 2021
10df79a
pytest passes
anguillanneuf Sep 9, 2021
6668dd7
nox tests pass and add readme
anguillanneuf Sep 10, 2021
7c1a63d
add license header
anguillanneuf Sep 10, 2021
61564a0
Merge branch 'master' into pubsublite
anguillanneuf Sep 10, 2021
a7750b2
update region tag
anguillanneuf Sep 10, 2021
da4e9ee
Merge branch 'master' into pubsublite
anguillanneuf Sep 13, 2021
857d916
Merge branch 'master' into pubsublite
anguillanneuf Sep 14, 2021
fce1d3c
Update pubsublite/spark-connector/README.md
anguillanneuf Sep 14, 2021
b620106
address reviewer comments
anguillanneuf Sep 14, 2021
f35d250
Merge branch 'pubsublite' of github.com:GoogleCloudPlatform/python-do…
anguillanneuf Sep 14, 2021
c72510f
Merge branch 'master' into pubsublite
anguillanneuf Sep 14, 2021
15100aa
Merge branch 'master' into pubsublite
anguillanneuf Sep 14, 2021
418db54
use different uber jar uri
anguillanneuf Sep 14, 2021
a3abebe
Merge branch 'pubsublite' of github.com:GoogleCloudPlatform/python-do…
anguillanneuf Sep 14, 2021
5e0f794
address reviewer comment
anguillanneuf Sep 14, 2021
f7a2636
Merge branch 'master' into pubsublite
anguillanneuf Sep 14, 2021
55ae470
chore(deps): update dependency google-cloud-bigquery to v2.26.0 (#6654)
renovate-bot Sep 15, 2021
329d376
address reviewer comments
anguillanneuf Sep 15, 2021
fb49887
lint
anguillanneuf Sep 15, 2021
9f0f9c2
Merge branch 'master' into pubsublite
anguillanneuf Sep 15, 2021
4d88501
read from the starting offset
anguillanneuf Sep 15, 2021
f1f4867
run test in py-3.8 and py-3.9
anguillanneuf Sep 15, 2021
3209bba
only test in py-3.8 due to concurrent logging limitation
anguillanneuf Sep 15, 2021
86f7b8a
address jiangmichaelll's comments
anguillanneuf Sep 16, 2021
507d9e6
merge conflicts
anguillanneuf Sep 16, 2021
a7cc8a0
address jiangmichaelll's comments
anguillanneuf Sep 16, 2021
7ddde9c
Merge branch 'master' into pubsublite
anguillanneuf Sep 16, 2021
5c010f1
Merge branch 'master' into pubsublite
anguillanneuf Sep 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions pubsublite/spark-connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Using Spark SQL Streaming with Pub/Sub Lite

The samples in this directory show how to read messages from and write messages to Pub/Sub Lite from an [Apache Spark] cluster created with [Cloud Dataproc] using the [Pub/Sub Lite Spark Connector].

Get the connector's uber jar from this [public Cloud Storage location]. Alternatively, visit this [Maven link] to download the connector's uber jar. The uber jar has a "with-dependencies" suffix. You will need to include it on the driver and executor classpaths when submitting a Spark job, typically in the `--jars` flag.

## Before you begin

1. Install the [Cloud SDK].
> *Note:* This is not required in [Cloud Shell]
> because Cloud Shell has the Cloud SDK pre-installed.

1. Create a new Google Cloud project via the
[*New Project* page] or via the `gcloud` command line tool.

```sh
export PROJECT_ID=your-google-cloud-project-id
gcloud projects create $PROJECT_ID
```
Or use an existing Google Cloud project.
```sh
export PROJECT_ID=$(gcloud config get-value project)
```

1. [Enable billing].

1. Setup the Cloud SDK to your GCP project.

```sh
gcloud init
```

1. [Enable the APIs]: Pub/Sub Lite, Dataproc, Cloud Storage.

1. Create a Pub/Sub Lite [topic] and [subscription] in a supported [location].

```bash
export TOPIC_ID=your-topic-id
export SUBSCRIPTION_ID=your-subscription-id
export PUBSUBLITE_LOCATION=your-location

gcloud pubsub lite-topics create $TOPIC_ID \
--location=$PUBSUBLITE_LOCATION \
--partitions=2 \
--per-partition-bytes=30GiB

gcloud pubsub lite-subscriptions create $SUBSCRIPTION_ID \
--location=$PUBSUBLITE_LOCATION \
--topic=$TOPIC_ID
```

1. Create a Cloud Storage bucket.

```bash
export BUCKET_ID=your-gcs-bucket-id

gsutil mb gs://$BUCKET_ID
```

## Python setup

1. [Install Python and virtualenv].

1. Clone the `python-docs-samples` repository.

```bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
```

1. Navigate to the sample code directory.

```bash
cd python-docs-samples/pubsublite/spark-connector
```

1. Create a virtual environment and activate it.

```bash
python -m venv env
source env/bin/activate
```
> Once you are finished with the tutorial, you can deactivate
> the virtualenv and go back to your global Python environment
> by running `deactivate`.

1. Install the required packages.
```bash
python -m pip install -U -r requirements.txt
```

## Creating a Spark cluster on Dataproc

1. Go to [Cloud Console for Dataproc].

1. Go to Clusters, then [Create Cluster].
> **Note:** When setting up the cluster, you must choose
> [Dataproc Image Version 1.5] under ___Versioning___ because
> the connector currently only supports Spark 2.4.8.
> Additionally, in ___Manage security (optional)___, you
> must enable the cloud-platform scope for your cluster by
> checking "Allow API access to all Google Cloud services in
> the same project" under ___Project access___.

Here is an equivalent example using a `gcloud` command, with an additional optional argument to enable component gateway:

```sh
export CLUSTER_ID=your-cluster-id
export DATAPROC_REGION=your-dataproc-region

gcloud dataproc clusters create $CLUSTER_ID \
--region $DATAPROC_REGION \
--image-version 1.5-debian10 \
--scopes 'https://www.googleapis.com/auth/cloud-platform' \
--enable-component-gateway
```

## Writing to Pub/Sub Lite

[spark_streaming_to_pubsublite_example.py](spark_streaming_to_pubsublite_example.py) creates a streaming source of consecutive numbers with timestamps for 60 seconds and writes them to a Pub/Sub topic.

To submit a write job:

```sh
export PROJECT_NUMBER=$(gcloud projects list --filter="projectId:$PROJECT_ID" --format="value(PROJECT_NUMBER)")

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC_ID
```

Visit the job URL in the command output or the jobs panel in [Cloud Console for Dataproc] to monitor the job progress.

You should see INFO logging like the following in the output:

```none
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
```

## Reading from Pub/Sub Lite

[spark_streaming_from_pubsublite_example.py](spark_streaming_from_pubsublite_example.py) reads messages formatted as dataframe rows from a Pub/Sub subscription and prints them out to the console.

To submit a read job:

```sh
gcloud dataproc jobs submit pyspark spark_streaming_from_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION_ID
```

Here is an example output: <!--TODO: update attributes field output with the next release of the connector-->

```none
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
```

[Apache Spark]: https://spark.apache.org/
[Pub/Sub Lite Spark Connector]: https://github.com/googleapis/java-pubsublite-spark
[Cloud Dataproc]: https://cloud.google.com/dataproc/docs/
[public Cloud Storage location]: gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar
[Maven link]: https://search.maven.org/search?q=g:com.google.cloud%20a:pubsublite-spark-sql-streaming

[Cloud SDK]: https://cloud.google.com/sdk/docs/
[Cloud Shell]: https://console.cloud.google.com/cloudshell/editor/
[*New Project* page]: https://console.cloud.google.com/projectcreate
[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project/
[Enable the APIs]: https://console.cloud.google.com/flows/enableapi?apiid=pubsublite.googleapis.com,dataproc,storage_component
[topic]: https://cloud.google.com/pubsub/lite/docs/topics
[subscription]: https://cloud.google.com/pubsub/lite/docs/subscriptions
[location]: https://cloud.google.com/pubsub/lite/docs/locations

[Install Python and virtualenv]: https://cloud.google.com/python/setup/
[Cloud Console for Dataproc]: https://console.cloud.google.com/dataproc/

[Create Cluster]: https://pantheon.corp.google.com/dataproc/clustersAdd
[Dataproc Image Version 1.5]: https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-1.5
42 changes: 42 additions & 0 deletions pubsublite/spark-connector/noxfile_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Default TEST_CONFIG_OVERRIDE for python repos.

# You can copy this file into your directory, then it will be imported from
# the noxfile.py.

# The source of truth:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
# NOTE: We currently only run the test in Python 3.7 and 3.8.
"ignored_versions": ["2.7", "3.6", "3.9"],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": True,
# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
"envs": {
"PUBSUBLITE_BUCKET_ID": "pubsublite-spark",
"PUBSUBLITE_CLUSTER_ID": "pubsublite-spark",
},
}
4 changes: 4 additions & 0 deletions pubsublite/spark-connector/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
google-cloud-dataproc==2.5.0
google-cloud-pubsublite==1.1.0
google-cloud-storage==1.42.1
pytest==6.2.5
1 change: 1 addition & 0 deletions pubsublite/spark-connector/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pyspark[sql]==3.1.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse


def spark_streaming_from_pubsublite(
project_number: int, location: str, subscription_id: str
) -> None:
# [START pubsublite_spark_streaming_from_pubsublite]
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
)
.load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
sdf.writeStream.format("console")
.outputMode("append")
.trigger(processingTime="1 second")
.start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
# [END pubsublite_spark_streaming_from_pubsublite]


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--project_number", help="Google Cloud Project Number")
parser.add_argument("--location", help="Your Cloud location, e.g. us-central1-a")
parser.add_argument("--subscription_id", help="Your Pub/Sub Lite subscription ID")

args = parser.parse_args()

spark_streaming_from_pubsublite(
args.project_number, args.location, args.subscription_id
)
Loading