-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Closed
Description
I have an application that reads data from the source and then writes to multiple destination tables. Which data to write to which table is decided by filtering at runtime. I run into an issue when the filtered DataFrame is empty and got this exception
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
at org.apache.spark.sql.execution.datasources.v2.OverwritePartitionsDynamicExec.writeWithV2(WriteToDataSourceV2Exec.scala:278)
at org.apache.spark.sql.execution.datasources.v2.OverwritePartitionsDynamicExec.run(WriteToDataSourceV2Exec.scala:287)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriterV2.$anonfun$runCommand$1(DataFrameWriterV2.scala:196)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
at org.apache.spark.sql.DataFrameWriterV2.overwritePartitions(DataFrameWriterV2.scala:186)
... 47 elided
Caused by: java.lang.IllegalStateException: Cannot determine partition spec: no data or delete files have been added
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:508)
at org.apache.iceberg.MergingSnapshotProducer.writeSpec(MergingSnapshotProducer.java:121)
at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:58)
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:164)
at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:26)
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:283)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:282)
at org.apache.iceberg.BaseReplacePartitions.commit(BaseReplacePartitions.java:26)
at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:201)
at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:92)
at org.apache.iceberg.spark.source.SparkWrite$DynamicOverwrite.commit(SparkWrite.java:273)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
... 68 more
This issue can be reproduced by using sparks-shell as follow
spark.sql("create table test1 (c1 string, p1 string) using iceberg partitioned by (p1)")
spark.sql("create table test2 (c1 string, p1 string) using iceberg partitioned by (p1)")
spark.table("test1").sortWithinPartitions("p1").writeTo("test2").overwritePartitions()
I built the latest code from the master branch and the issue is still there
SreeramGarlapati
Metadata
Metadata
Assignees
Labels
No labels