diff --git a/pubsublite/spark-connector/README.md b/pubsublite/spark-connector/README.md new file mode 100644 index 00000000000..0b8a47db46e --- /dev/null +++ b/pubsublite/spark-connector/README.md @@ -0,0 +1,189 @@ +# Using Spark SQL Streaming with Pub/Sub Lite + +The samples in this directory show how to read messages from and write messages to Pub/Sub Lite from an [Apache Spark] cluster created with [Cloud Dataproc] using the [Pub/Sub Lite Spark Connector]. + +Get the connector's uber jar from this [public Cloud Storage location]. Alternatively, visit this [Maven link] to download the connector's uber jar. The uber jar has a "with-dependencies" suffix. You will need to include it on the driver and executor classpaths when submitting a Spark job, typically in the `--jars` flag. + +## Before you begin + +1. Install the [Cloud SDK]. + > *Note:* This is not required in [Cloud Shell] + > because Cloud Shell has the Cloud SDK pre-installed. + +1. Create a new Google Cloud project via the + [*New Project* page] or via the `gcloud` command line tool. + + ```sh + export PROJECT_ID=your-google-cloud-project-id + gcloud projects create $PROJECT_ID + ``` + Or use an existing Google Cloud project. + ```sh + export PROJECT_ID=$(gcloud config get-value project) + ``` + +1. [Enable billing]. + +1. Setup the Cloud SDK to your GCP project. + + ```sh + gcloud init + ``` + +1. [Enable the APIs]: Pub/Sub Lite, Dataproc, Cloud Storage. + +1. Create a Pub/Sub Lite [topic] and [subscription] in a supported [location]. + + ```bash + export TOPIC_ID=your-topic-id + export SUBSCRIPTION_ID=your-subscription-id + export PUBSUBLITE_LOCATION=your-location + + gcloud pubsub lite-topics create $TOPIC_ID \ + --location=$PUBSUBLITE_LOCATION \ + --partitions=2 \ + --per-partition-bytes=30GiB + + gcloud pubsub lite-subscriptions create $SUBSCRIPTION_ID \ + --location=$PUBSUBLITE_LOCATION \ + --topic=$TOPIC_ID + ``` + +1. Create a Cloud Storage bucket. + + ```bash + export BUCKET_ID=your-gcs-bucket-id + + gsutil mb gs://$BUCKET_ID + ``` + +## Python setup + +1. [Install Python and virtualenv]. + +1. Clone the `python-docs-samples` repository. + + ```bash + git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + ``` + +1. Navigate to the sample code directory. + + ```bash + cd python-docs-samples/pubsublite/spark-connector + ``` + +1. Create a virtual environment and activate it. + + ```bash + python -m venv env + source env/bin/activate + ``` + > Once you are finished with the tutorial, you can deactivate + > the virtualenv and go back to your global Python environment + > by running `deactivate`. + +1. Install the required packages. + ```bash + python -m pip install -U -r requirements.txt + ``` + +## Creating a Spark cluster on Dataproc + +1. Go to [Cloud Console for Dataproc]. + +1. Go to Clusters, then [Create Cluster]. + > **Note:** When setting up the cluster, you must choose + > [Dataproc Image Version 1.5] under ___Versioning___ because + > the connector currently only supports Spark 2.4.8. + > Additionally, in ___Manage security (optional)___, you + > must enable the cloud-platform scope for your cluster by + > checking "Allow API access to all Google Cloud services in + > the same project" under ___Project access___. + + Here is an equivalent example using a `gcloud` command, with an additional optional argument to enable component gateway: + + ```sh + export CLUSTER_ID=your-cluster-id + export DATAPROC_REGION=your-dataproc-region + + gcloud dataproc clusters create $CLUSTER_ID \ + --region $DATAPROC_REGION \ + --image-version 1.5-debian10 \ + --scopes 'https://www.googleapis.com/auth/cloud-platform' \ + --enable-component-gateway + ``` + +## Writing to Pub/Sub Lite + +[spark_streaming_to_pubsublite_example.py](spark_streaming_to_pubsublite_example.py) creates a streaming source of consecutive numbers with timestamps for 60 seconds and writes them to a Pub/Sub topic. + +To submit a write job: + +```sh +export PROJECT_NUMBER=$(gcloud projects list --filter="projectId:$PROJECT_ID" --format="value(PROJECT_NUMBER)") + +gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \ + --region=$DATAPROC_REGION \ + --cluster=$CLUSTER_ID \ + --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \ + --driver-log-levels=root=INFO \ + --properties=spark.master=yarn \ + -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC_ID +``` + +Visit the job URL in the command output or the jobs panel in [Cloud Console for Dataproc] to monitor the job progress. + +You should see INFO logging like the following in the output: + +```none +INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId .. +``` + +## Reading from Pub/Sub Lite + +[spark_streaming_from_pubsublite_example.py](spark_streaming_from_pubsublite_example.py) reads messages formatted as dataframe rows from a Pub/Sub subscription and prints them out to the console. + +To submit a read job: + +```sh +gcloud dataproc jobs submit pyspark spark_streaming_from_pubsublite_example.py \ + --region=$DATAPROC_REGION \ + --cluster=$CLUSTER_ID \ + --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \ + --driver-log-levels=root=INFO \ + --properties=spark.master=yarn \ + -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION_ID +``` + +Here is an example output: + +```none ++--------------------+---------+------+---+----+--------------------+--------------------+----------+ +| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes| ++--------------------+---------+------+---+----+--------------------+--------------------+----------+ +|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []| +|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []| +|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []| +``` + +[Apache Spark]: https://spark.apache.org/ +[Pub/Sub Lite Spark Connector]: https://github.com/googleapis/java-pubsublite-spark +[Cloud Dataproc]: https://cloud.google.com/dataproc/docs/ +[public Cloud Storage location]: gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar +[Maven link]: https://search.maven.org/search?q=g:com.google.cloud%20a:pubsublite-spark-sql-streaming + +[Cloud SDK]: https://cloud.google.com/sdk/docs/ +[Cloud Shell]: https://console.cloud.google.com/cloudshell/editor/ +[*New Project* page]: https://console.cloud.google.com/projectcreate +[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project/ +[Enable the APIs]: https://console.cloud.google.com/flows/enableapi?apiid=pubsublite.googleapis.com,dataproc,storage_component +[topic]: https://cloud.google.com/pubsub/lite/docs/topics +[subscription]: https://cloud.google.com/pubsub/lite/docs/subscriptions +[location]: https://cloud.google.com/pubsub/lite/docs/locations + +[Install Python and virtualenv]: https://cloud.google.com/python/setup/ +[Cloud Console for Dataproc]: https://console.cloud.google.com/dataproc/ + +[Create Cluster]: https://pantheon.corp.google.com/dataproc/clustersAdd +[Dataproc Image Version 1.5]: https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-1.5 \ No newline at end of file diff --git a/pubsublite/spark-connector/noxfile_config.py b/pubsublite/spark-connector/noxfile_config.py new file mode 100644 index 00000000000..73e9736cb90 --- /dev/null +++ b/pubsublite/spark-connector/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + # NOTE: We currently only run the test in Python 3.7 and 3.8. + "ignored_versions": ["2.7", "3.6", "3.9"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": { + "PUBSUBLITE_BUCKET_ID": "pubsublite-spark", + "PUBSUBLITE_CLUSTER_ID": "pubsublite-spark", + }, +} diff --git a/pubsublite/spark-connector/requirements-test.txt b/pubsublite/spark-connector/requirements-test.txt new file mode 100644 index 00000000000..606f339f2b6 --- /dev/null +++ b/pubsublite/spark-connector/requirements-test.txt @@ -0,0 +1,4 @@ +google-cloud-dataproc==2.5.0 +google-cloud-pubsublite==1.1.0 +google-cloud-storage==1.42.1 +pytest==6.2.5 \ No newline at end of file diff --git a/pubsublite/spark-connector/requirements.txt b/pubsublite/spark-connector/requirements.txt new file mode 100644 index 00000000000..ee95b11b1e6 --- /dev/null +++ b/pubsublite/spark-connector/requirements.txt @@ -0,0 +1 @@ +pyspark[sql]==3.1.2 \ No newline at end of file diff --git a/pubsublite/spark-connector/spark_streaming_from_pubsublite_example.py b/pubsublite/spark-connector/spark_streaming_from_pubsublite_example.py new file mode 100644 index 00000000000..eeb39bbe94b --- /dev/null +++ b/pubsublite/spark-connector/spark_streaming_from_pubsublite_example.py @@ -0,0 +1,68 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + + +def spark_streaming_from_pubsublite( + project_number: int, location: str, subscription_id: str +) -> None: + # [START pubsublite_spark_streaming_from_pubsublite] + from pyspark.sql import SparkSession + from pyspark.sql.types import StringType + + # TODO(developer): + # project_number = 11223344556677 + # location = "us-central1-a" + # subscription_id = "your-subscription-id" + + spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate() + + sdf = ( + spark.readStream.format("pubsublite") + .option( + "pubsublite.subscription", + f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}", + ) + .load() + ) + + sdf = sdf.withColumn("data", sdf.data.cast(StringType())) + + query = ( + sdf.writeStream.format("console") + .outputMode("append") + .trigger(processingTime="1 second") + .start() + ) + + # Wait 120 seconds (must be >= 60 seconds) to start receiving messages. + query.awaitTermination(120) + query.stop() + # [END pubsublite_spark_streaming_from_pubsublite] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--project_number", help="Google Cloud Project Number") + parser.add_argument("--location", help="Your Cloud location, e.g. us-central1-a") + parser.add_argument("--subscription_id", help="Your Pub/Sub Lite subscription ID") + + args = parser.parse_args() + + spark_streaming_from_pubsublite( + args.project_number, args.location, args.subscription_id + ) diff --git a/pubsublite/spark-connector/spark_streaming_test.py b/pubsublite/spark-connector/spark_streaming_test.py new file mode 100644 index 00000000000..6fa25824dd6 --- /dev/null +++ b/pubsublite/spark-connector/spark_streaming_test.py @@ -0,0 +1,234 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import pathlib +import re +from typing import Generator +import uuid + +from google.api_core.exceptions import NotFound +from google.cloud import dataproc_v1, storage +from google.cloud.pubsublite import AdminClient, Subscription, Topic +from google.cloud.pubsublite.types import ( + BacklogLocation, + CloudRegion, + CloudZone, + SubscriptionPath, + TopicPath, +) +import pytest + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +PROJECT_NUMBER = os.environ["GOOGLE_CLOUD_PROJECT_NUMBER"] +CLOUD_REGION = "us-west1" +ZONE_ID = "a" +CLUSTER_ID = os.environ["PUBSUBLITE_CLUSTER_ID"] +BUCKET = os.environ["PUBSUBLITE_BUCKET_ID"] +UUID = uuid.uuid4().hex +TOPIC_ID = "spark-streaming-topic-" + UUID +SUBSCRIPTION_ID = "spark-streaming-subscription-" + UUID +PERMANENT_TOPIC_ID = "spark-streaming-topic" +CURRENT_DIR = pathlib.Path(__file__).parent.resolve() + + +@pytest.fixture(scope="module") +def client() -> Generator[AdminClient, None, None]: + yield AdminClient(CLOUD_REGION) + + +@pytest.fixture(scope="module") +def topic(client: AdminClient) -> Generator[Topic, None, None]: + location = CloudZone(CloudRegion(CLOUD_REGION), ZONE_ID) + topic_path = TopicPath(PROJECT_NUMBER, location, TOPIC_ID) + + # A topic of 2 partitions, each of size 30 GiB, publish throughput + # capacity per partition to 4 MiB/s, and subscribe throughput + # capacity per partition to 8 MiB/s. + topic = Topic( + name=str(topic_path), + partition_config=Topic.PartitionConfig( + count=2, + capacity=Topic.PartitionConfig.Capacity( + publish_mib_per_sec=4, subscribe_mib_per_sec=8, + ), + ), + retention_config=Topic.RetentionConfig( + per_partition_bytes=30 * 1024 * 1024 * 1024, + ), + ) + + try: + response = client.get_topic(topic.name) + except NotFound: + response = client.create_topic(topic) + + yield response + + try: + client.delete_topic(response.name) + except NotFound as e: + print(e.message) + + +@pytest.fixture(scope="module") +def subscription(client: AdminClient) -> Generator[Subscription, None, None]: + location = CloudZone(CloudRegion(CLOUD_REGION), ZONE_ID) + subscription_path = SubscriptionPath(PROJECT_NUMBER, location, SUBSCRIPTION_ID) + + subscription = Subscription( + name=str(subscription_path), + topic=f"projects/{PROJECT_NUMBER}/locations/{location}/topics/{PERMANENT_TOPIC_ID}", + delivery_config=Subscription.DeliveryConfig( + delivery_requirement=Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY, + ), + ) + + try: + response = client.get_subscription(subscription.name) + except NotFound: + # This subscription will start receiving the first message in the topic. + response = client.create_subscription(subscription, BacklogLocation.BEGINNING) + yield response + try: + client.delete_subscription(response.name) + except NotFound as e: + print(e.message) + + +def pyfile(source_file: str) -> str: + storage_client = storage.Client() + bucket = storage_client.bucket(BUCKET) + destination_blob_name = os.path.join(UUID, source_file) + blob = bucket.blob(destination_blob_name) + blob.upload_from_filename(source_file) + return "gs://" + blob.bucket.name + "/" + blob.name + + +def test_spark_streaming_to_pubsublite(topic: Topic) -> None: + from google.cloud.dataproc_v1.types import LoggingConfig + + # Create a Dataproc job client. + job_client = dataproc_v1.JobControllerClient( + client_options={ + "api_endpoint": "{}-dataproc.googleapis.com:443".format(CLOUD_REGION) + } + ) + + # Create the job config. + job = { + "placement": {"cluster_name": CLUSTER_ID}, + "pyspark_job": { + "main_python_file_uri": pyfile("spark_streaming_to_pubsublite_example.py"), + "jar_file_uris": [ + "gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar" + ], + "properties": {"spark.master": "yarn"}, + "logging_config": {"driver_log_levels": {"root": LoggingConfig.Level.INFO}}, + "args": [ + f"--project_number={PROJECT_NUMBER}", + f"--location={CLOUD_REGION}-{ZONE_ID}", + f"--topic_id={TOPIC_ID}", + ], + }, + } + + operation = job_client.submit_job_as_operation( + request={ + "project_id": PROJECT_ID, + "region": CLOUD_REGION, + "job": job, + "request_id": "write-" + UUID, + } + ) + response = operation.result() + + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_text() + ) + + assert "Committed 1 messages for epochId" in output + + +def test_spark_streaming_from_pubsublite(subscription: Subscription) -> None: + from google.cloud.dataproc_v1.types import LoggingConfig + + # Create a Dataproc job client. + job_client = dataproc_v1.JobControllerClient( + client_options={ + "api_endpoint": "{}-dataproc.googleapis.com:443".format(CLOUD_REGION) + } + ) + + # Create the job config. + job = { + "placement": {"cluster_name": CLUSTER_ID}, + "pyspark_job": { + "main_python_file_uri": pyfile( + "spark_streaming_from_pubsublite_example.py" + ), + "jar_file_uris": [ + "gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar" + ], + "properties": {"spark.master": "yarn"}, + "logging_config": {"driver_log_levels": {"root": LoggingConfig.Level.INFO}}, + "args": [ + f"--project_number={PROJECT_NUMBER}", + f"--location={CLOUD_REGION}-{ZONE_ID}", + f"--subscription_id={SUBSCRIPTION_ID}", + ], + }, + } + + operation = job_client.submit_job_as_operation( + request={ + "project_id": PROJECT_ID, + "region": CLOUD_REGION, + "job": job, + "request_id": "read-" + UUID, + } + ) + response = operation.result() + + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_text() + ) + + assert "Batch: 0\n" in output + assert ( + "+--------------------+---------+------+----+------+" + + "--------------------+--------------------+----------+\n" + + "| subscription|partition|offset| key| data" + + "| publish_timestamp| event_timestamp|attributes|\n" + + "+--------------------+---------+------+----+------+" + + "--------------------+--------------------+----------+\n" + + "|projects/10126164...| 0| 0|[34]|353534" + + "|2021-09-15 21:55:...|2021-09-15 00:04:...| []|\n" + in output + ) diff --git a/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py b/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py new file mode 100644 index 00000000000..91e3a5731e6 --- /dev/null +++ b/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py @@ -0,0 +1,76 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + + +def spark_streaming_to_pubsublite( + project_number: int, location: str, topic_id: str +) -> None: + # [START pubsublite_spark_streaming_to_pubsublite] + from pyspark.sql import SparkSession + from pyspark.sql.types import BinaryType, StringType + import uuid + + # TODO(developer): + # project_number = 11223344556677 + # location = "us-central1-a" + # topic_id = "your-topic-id" + + spark = SparkSession.builder.appName("write-app").master("yarn").getOrCreate() + + # Create a RateStreamSource that generates consecutive numbers with timestamps: + # |-- timestamp: timestamp (nullable = true) + # |-- value: long (nullable = true) + sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load() + + sdf = ( + sdf.withColumn("key", (sdf.value % 5).cast(StringType()).cast(BinaryType())) + .withColumn("event_timestamp", sdf.timestamp) + .withColumn("data", sdf.value.cast(StringType()).cast(BinaryType())) + .drop("value", "timestamp") + ) + + sdf.printSchema() + + query = ( + sdf.writeStream.format("pubsublite") + .option( + "pubsublite.topic", + f"projects/{project_number}/locations/{location}/topics/{topic_id}", + ) + # Required. Use a unique checkpoint location for each job. + .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex) + .outputMode("append") + .trigger(processingTime="1 second") + .start() + ) + + # Wait 60 seconds to terminate the query. + query.awaitTermination(60) + query.stop() + # [END pubsublite_spark_streaming_to_pubsublite] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--project_number", help="Google Cloud Project Number") + parser.add_argument("--location", help="Your Cloud location, e.g. us-central1-a") + parser.add_argument("--topic_id", help="Your Pub/Sub Lite topic ID") + + args = parser.parse_args() + + spark_streaming_to_pubsublite(args.project_number, args.location, args.topic_id)