Skip to content

Conversation

@jiangmichaellll
Copy link
Contributor

Fixes #261

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 26, 2022 20:00
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite-spark API. label Jan 26, 2022
@jiangmichaellll jiangmichaellll force-pushed the jiangmichael-b-199225459 branch from c5d38c3 to 6b642e4 Compare January 26, 2022 22:06
@jiangmichaellll
Copy link
Contributor Author

== or != compares reference while sameType compares the actual value.

@anguillanneuf
Copy link
Collaborator

anguillanneuf commented Jan 27, 2022

I created a streaming dataframe of the following schema:

>> sdf.printSchema()

 |-- event_timestamp: timestamp (nullable = true)
 |-- key: binary (nullable = false)
 |-- data: binary (nullable = true)
 |-- attributes: map (nullable = false)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = false)
 |    |    |-- element: binary (containsNull = false)

But writing it to Pub/Sub Lite still failed. The complaint is java.lang.NoClassDefFoundError: scala/Function2$class which has provided scope.

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

2022-01-27 01:08:25 ERROR MicroBatchExecution:91 - Query [id = 281f3108-63f9-488e-a2b3-14d6c68cc773, runId = eb1e277f-a444-41d5-af05-e9d9539738d4] terminated with error
org.apache.spark.SparkException: Writing job aborted.
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:136)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:160)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:157)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:252)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:301)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3388)
        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3369)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:551)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:547)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:546)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, tz-lite-w-1.c.tz-playground-bigdata.internal, executor 2): java.lang.NoClassDefFoundError: scala/Function2$class
        at scala.compat.java8.functionConverterImpls.FromJavaBiConsumer.<init>(FunctionConverters.scala:12)
        at com.google.cloud.pubsublite.spark.PslSparkUtils.lambda$toPubSubMessage$7(PslSparkUtils.java:149)
        at com.google.cloud.pubsublite.spark.PslSparkUtils.extractVal(PslSparkUtils.java:116)
        at com.google.cloud.pubsublite.spark.PslSparkUtils.toPubSubMessage(PslSparkUtils.java:141)
        at com.google.cloud.pubsublite.spark.PslDataWriter.write(PslDataWriter.java:72)
        at com.google.cloud.pubsublite.spark.PslDataWriter.write(PslDataWriter.java:37)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:414)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

@jiangmichaellll jiangmichaellll requested a review from a team as a code owner January 27, 2022 01:33
@anguillanneuf
Copy link
Collaborator

anguillanneuf commented Jan 27, 2022

@jiangmichaellll I tried a Python example after the scala library was included in the shaded jar, and it worked.
I'm surprised that it's not provided by Dataproc. It does increase the size of the shaded jar from 48.3 MiB to 53.8 MiB.

@jiangmichaellll
Copy link
Contributor Author

Thanks for uncovering that deps issue! I am going to check this in and create another PR tmr to fix the deps issue.

@jiangmichaellll jiangmichaellll merged commit 451901a into main Jan 27, 2022
@jiangmichaellll jiangmichaellll deleted the jiangmichael-b-199225459 branch January 27, 2022 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsublite Issues related to the googleapis/java-pubsublite-spark API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Issue using PySpark writeStream to write to Pub/Sub Lite with an attributes field

2 participants