diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 4f7d5069be1eb..e4ec4c05fafaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -159,6 +159,17 @@ object FileFormatWriter extends Logging { statsTrackers = statsTrackers ) + SQLExecution.checkSQLExecutionId(sparkSession) + + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) + + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + // It must be run before `materializeAdaptiveSparkPlan()` + committer.setupJob(job) + // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns @@ -181,16 +192,6 @@ object FileFormatWriter extends Logging { } } - SQLExecution.checkSQLExecutionId(sparkSession) - - // propagate the description UUID into the jobs, so that committers - // get an ID guaranteed to be unique. - job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) - - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - committer.setupJob(job) - try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { (materializedPlan.execute(), None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index ccdba809292de..41cae6a280af0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.PrimitiveType @@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -1275,4 +1275,27 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } } + + test("SPARK-43327: location exists when insertoverwrite fails") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t", "t1") { + sql("create table t(c1 int) using parquet") + sql("create table t1(c2 long) using parquet") + sql("INSERT OVERWRITE TABLE t1 select 6000044164") + + // spark.sql("CREATE TABLE IF NOT EXISTS t(amt1 int) using ORC") + val identifier = TableIdentifier("t") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location + + intercept[SparkException] { + sql("INSERT OVERWRITE TABLE t select c2 from " + + "(select cast(c2 as int) as c2 from t1 distribute by c2)") + } + // scalastyle:off hadoopconfiguration + val fs = FileSystem.get(location, spark.sparkContext.hadoopConfiguration) + // scalastyle:on hadoopconfiguration + assert(fs.exists(new Path(location))) + } + } + } }