Skip to content

Conversation

@anguillanneuf
Copy link
Member

@anguillanneuf anguillanneuf commented Sep 10, 2021

This PR contains code that will turn into a quickstart page on cloud.google.com/pubsub/docs.

Notes to the reviewer:

  1. Creating a dataframe column named "attributes" is currently blocked on Issue using PySpark writeStream to write to Pub/Sub Lite with an attributes field googleapis/java-pubsublite-spark#261. cc: @jiangmichaellll
    Once unblocked, I can add these lines:
    sdf = (
    sdf.withColumn("key", (sdf.value % 5).cast(StringType()).cast(BinaryType()))
    .withColumn("event_timestamp", sdf.timestamp)
    .withColumn(
    "data",
    sdf.value.cast(StringType()).cast(BinaryType())
    # ).withColumn(
    # "attributes", create_map(
    # lit("prop1"), array(divisible_by_two_udf("value").cast(BinaryType()))).cast(MapType(StringType(), ArrayType(BinaryType()), True))
    )
    .drop("value", "timestamp")
    )

    Also added TODO.
  2. Region tags are tracked in b/199554829 and b/199554739.
  3. Internal cl/396882350.

@snippet-bot
Copy link

snippet-bot bot commented Sep 10, 2021

Here is the summary of changes.

You are about to add 2 region tags.

This comment is generated by snippet-bot.
If you find problems with this result, please file an issue at:
https://github.com/googleapis/repo-automation-bots/issues.
To update this comment, add snippet-bot:force-run label or use the checkbox below:

  • Refresh this comment

@product-auto-label product-auto-label bot added the samples Issues that are directly related to samples. label Sep 10, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Sep 10, 2021
@anguillanneuf anguillanneuf marked this pull request as ready for review September 10, 2021 22:09
@anguillanneuf anguillanneuf requested a review from a team as a code owner September 10, 2021 22:09
@anguillanneuf anguillanneuf changed the title Spark SQL Streaming to Pub/Sub Lite using Pub/Sub Lite Spark Connector feat(pubsublite): Spark SQL Streaming to Pub/Sub Lite using Pub/Sub Lite Spark Connector Sep 10, 2021
@dandhlee dandhlee added the api: pubsublite Issues related to the Pub/Sub Lite API. label Sep 10, 2021
Copy link
Collaborator

@dandhlee dandhlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for submitting this! Few comments below.

As well, is there another Pub/Sub expert who could take a look at this sample as a product owner?

@anguillanneuf
Copy link
Member Author

anguillanneuf commented Sep 14, 2021

As well, is there another Pub/Sub expert who could take a look at this sample as a product owner?

@jiangmichaellll (developer for the connector, he is a Pub/Sub Lite dev) should be the person to review this.

Copy link

@jiangmichaellll jiangmichaellll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looking good, let me know when you need next round of review

Copy link
Collaborator

@dandhlee dandhlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there!

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
# NOTE: Apache Beam does not currently support Python 3.9.
"ignored_versions": ["2.7", "3.6", "3.7", "3.9"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think I missed this before, but is it a strict requirement to only be using 3.8?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can open this up as a last step. Right now just want to test to run fast.

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangmichaellll Looks like multiple streaming jobs in the same cluster will produce "Concurrent update to the log. Multiple streaming jobs detected" and cause one job to fail. See job log.

@dandhlee Since we are just using one Spark cluster, I vote we just test against one Python version.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm did you manually run jobs concurrently? I never seen this before. I usually either change app name or only run one at a time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When two Python versions are selected, two ITs run in parallel (two different topics created at roughly the same time, two PySpark jobs submitted at the roughly the same time as well).

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used uuid in appName when starting up a SparkSession. But one write job would fail (log) and the other one succeed (log).

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using uuid in checkpointLocation solved Concurrent update to the log.

Copy link
Member Author

@anguillanneuf anguillanneuf Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But occasionally you may see something like this where no messages are published to Pub/Sub Lite due to insufficient worker resource (may happen if multiple presumbit tests are running, each requesting multiple Python versions):

21/09/16 19:19:53 INFO com.google.cloud.pubsublite.spark.PslSparkUtils: Input schema to write to Pub/Sub Lite doesn't contain attributes column, this field for all rows will be set to empty. [CONTEXT ratelimit_period="5 MINUTES" ]
21/09/16 19:19:54 INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 0 messages for epochId:0.
21/09/16 19:19:54 WARN org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1334 milliseconds
21/09/16 19:20:09 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
21/09/16 19:20:24 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
21/09/16 19:20:39 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
21/09/16 19:20:52 ERROR org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6d4c678c is aborting.
21/09/16 19:20:52 WARN com.google.cloud.pubsublite.spark.PslStreamWriter: Epoch id: 1 is aborted, 0 messages might have been published.

Copy link

@jiangmichaellll jiangmichaellll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming you uuid suffix the appName to address the concurrent issue.

@anguillanneuf anguillanneuf force-pushed the pubsublite branch 2 times, most recently from 83c36f3 to bd21883 Compare September 16, 2021 19:13
Copy link
Collaborator

@dandhlee dandhlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@dandhlee
Copy link
Collaborator

Admin merging since we have a product expert review and a samples reviewer review!

@dandhlee dandhlee merged commit 267a36a into master Sep 16, 2021
@dandhlee dandhlee deleted the pubsublite branch September 16, 2021 21:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsublite Issues related to the Pub/Sub Lite API. cla: yes This human has signed the Contributor License Agreement. samples Issues that are directly related to samples.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants