diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e6e9c9e32885..d76ec99b068c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -120,8 +120,9 @@ abstract class FileCommitProtocol { * Specifies that a file should be deleted with the commit of this job. The default * implementation deletes the file immediately. */ - def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { - fs.delete(path, recursive) + def deleteWithJob(fs: FileSystem, path: Path, + canDeleteNow: Boolean = true): Boolean = { + fs.delete(path, true) } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3e60c50ada59..f7d9c3a07a67 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -17,13 +17,14 @@ package org.apache.spark.internal.io +import java.io.IOException import java.util.{Date, UUID} import scala.collection.mutable import scala.util.Try import org.apache.hadoop.conf.Configurable -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -84,6 +85,12 @@ class HadoopMapReduceCommitProtocol( */ @transient private var partitionPaths: mutable.Set[String] = null + /** + * Tracks files will be delete when commit the job + */ + @transient private val pathsToDelete: mutable.Map[FileSystem, + mutable.Set[Path]] = mutable.HashMap() + /** * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. @@ -163,6 +170,8 @@ class HadoopMapReduceCommitProtocol( } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + cleanPathToDelete() + committer.commitJob(jobContext) if (hasValidPath) { @@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol( tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) } } + + /** + * now just record the file to be delete + */ + override def deleteWithJob(fs: FileSystem, path: Path, + canDeleteNow: Boolean = true): Boolean = { + if (canDeleteNow) { + super.deleteWithJob(fs, path) + } else { + val set = if (pathsToDelete.contains(fs)) { + pathsToDelete(fs) + } else { + new mutable.HashSet[Path]() + } + + set.add(path) + pathsToDelete.put(fs, set) + true + } + } + + private def cleanPathToDelete(): Unit = { + // first delete the should delete special file + for (fs <- pathsToDelete.keys) { + for (path <- pathsToDelete(fs)) { + try { + if (!fs.delete(path, true)) { + logWarning(s"Delete path ${path} fail at job commit time") + } + } catch { + case ex: IOException => + throw new IOException(s"Unable to clear output " + + s"file ${path} at job commit time", ex) + } + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index bf4d96fa18d0..7447be24f96e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -889,13 +889,17 @@ object DDLUtils { * Throws exception if outputPath tries to overwrite inputpath. */ def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = { + if (isInReadPath(query, outputPath)) { + throw new AnalysisException( + "Cannot overwrite a path that is also being read from.") + } + } + + def isInReadPath(query: LogicalPlan, outputPath: Path): Boolean = { val inputPaths = query.collect { case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths }.flatten - if (inputPaths.contains(outputPath)) { - throw new AnalysisException( - "Cannot overwrite a path that is also being read from.") - } + inputPaths.contains(outputPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3f41612c0806..721f057662b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -189,7 +189,6 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } val outputPath = t.location.rootPaths.head - if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath) val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d15c14..3783202cf920 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -114,7 +114,9 @@ case class InsertIntoHadoopFsRelationCommand( // For dynamic partition overwrite, do not delete partition directories ahead. true } else { - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + val outputCheck = DDLUtils.isInReadPath(query, outputPath) + deleteMatchingPartitions(fs, outputCheck, qualifiedOutputPath, + customPartitionLocations, committer) true } case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => @@ -190,6 +192,7 @@ case class InsertIntoHadoopFsRelationCommand( */ private def deleteMatchingPartitions( fs: FileSystem, + outputCheck: Boolean, qualifiedOutputPath: Path, customPartitionLocations: Map[TablePartitionSpec, String], committer: FileCommitProtocol): Unit = { @@ -207,9 +210,23 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) - if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { - throw new IOException(s"Unable to clear output " + - s"directory $staticPrefixPath prior to writing to it") + if (fs.exists(staticPrefixPath)) { + if (staticPartitionPrefix.isEmpty && outputCheck) { + // input contain output, only delete output sub files when job commit + val files = fs.listFiles(staticPrefixPath, false) + while (files.hasNext) { + val file = files.next() + if (!committer.deleteWithJob(fs, file.getPath, false)) { + throw new IOException(s"Unable to clear output " + + s"directory ${file.getPath} prior to writing to it") + } + } + } else { + if (!committer.deleteWithJob(fs, staticPrefixPath, true)) { + throw new IOException(s"Unable to clear output " + + s"directory $staticPrefixPath prior to writing to it") + } + } } // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) for ((spec, customLoc) <- customPartitionLocations) { @@ -248,4 +265,5 @@ case class InsertIntoHadoopFsRelationCommand( } }.toMap } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index fef01c860db6..7e4619f5e3db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -231,16 +231,16 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } - test("it is not allowed to write to a table while querying it.") { - val message = intercept[AnalysisException] { - sql( + test("allowed to write to a table while querying it.") { + val df = sql(s"SELECT * FROM jsonTable") + sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jsonTable """.stripMargin) - }.getMessage - assert( - message.contains("Cannot overwrite a path that is also being read from."), - "INSERT OVERWRITE to a table while querying it should not be allowed.") + + checkAnswer( + sql("SELECT * FROM jsonTable"), + df) } test("Caching") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d93215fefb81..c46b2e475152 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1184,7 +1184,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("insertInto - source and target are the same table") { + test("insertInto - source and target can be the same table") { val tableName = "tab1" withTable(tableName) { Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName) @@ -1204,10 +1204,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv table(tableName), Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2))) - val e = intercept[AnalysisException] { - table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName) - }.getMessage - assert(e.contains(s"Cannot overwrite a path that is also being read from")) + table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName) + checkAnswer( + table(tableName), + Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)) + ) } }