Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ abstract class FileCommitProtocol {
* Specifies that a file should be deleted with the commit of this job. The default
* implementation deletes the file immediately.
*/
def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = {
fs.delete(path, recursive)
def deleteWithJob(fs: FileSystem, path: Path,
canDeleteNow: Boolean = true): Boolean = {
fs.delete(path, true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.spark.internal.io

import java.io.IOException
import java.util.{Date, UUID}

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Expand Down Expand Up @@ -84,6 +85,12 @@ class HadoopMapReduceCommitProtocol(
*/
@transient private var partitionPaths: mutable.Set[String] = null

/**
* Tracks files will be delete when commit the job
*/
@transient private val pathsToDelete: mutable.Map[FileSystem,
mutable.Set[Path]] = mutable.HashMap()

/**
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
Expand Down Expand Up @@ -163,6 +170,8 @@ class HadoopMapReduceCommitProtocol(
}

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
cleanPathToDelete()

committer.commitJob(jobContext)

if (hasValidPath) {
Expand Down Expand Up @@ -235,4 +244,41 @@ class HadoopMapReduceCommitProtocol(
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
}

/**
* now just record the file to be delete
*/
override def deleteWithJob(fs: FileSystem, path: Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to worry about concurrent access here, correct?

canDeleteNow: Boolean = true): Boolean = {
if (canDeleteNow) {
super.deleteWithJob(fs, path)
} else {
val set = if (pathsToDelete.contains(fs)) {
pathsToDelete(fs)
} else {
new mutable.HashSet[Path]()
}

set.add(path)
pathsToDelete.put(fs, set)
true
}
}

private def cleanPathToDelete(): Unit = {
// first delete the should delete special file
for (fs <- pathsToDelete.keys) {
for (path <- pathsToDelete(fs)) {
try {
if (!fs.delete(path, true)) {
logWarning(s"Delete path ${path} fail at job commit time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete -> false just means there was nothing there, I wouldn't warn at that point. Unless delete() throws an exception you assume that when the call returns, fs.exists(path) does not hold -regardless of the return value. (Special exception, the dest is "/")

}
} catch {
case ex: IOException =>
throw new IOException(s"Unable to clear output " +
s"file ${path} at job commit time", ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend including ex.toString() in the new exception raised, as child exception text can often get lost

}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -889,13 +889,17 @@ object DDLUtils {
* Throws exception if outputPath tries to overwrite inputpath.
*/
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
if (isInReadPath(query, outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}
}

def isInReadPath(query: LogicalPlan, outputPath: Path): Boolean = {
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten

if (inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}
inputPaths.contains(outputPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
}

val outputPath = t.location.rootPaths.head
if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ case class InsertIntoHadoopFsRelationCommand(
// For dynamic partition overwrite, do not delete partition directories ahead.
true
} else {
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
val outputCheck = DDLUtils.isInReadPath(query, outputPath)
deleteMatchingPartitions(fs, outputCheck, qualifiedOutputPath,
customPartitionLocations, committer)
true
}
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
Expand Down Expand Up @@ -190,6 +192,7 @@ case class InsertIntoHadoopFsRelationCommand(
*/
private def deleteMatchingPartitions(
fs: FileSystem,
outputCheck: Boolean,
qualifiedOutputPath: Path,
customPartitionLocations: Map[TablePartitionSpec, String],
committer: FileCommitProtocol): Unit = {
Expand All @@ -207,9 +210,23 @@ case class InsertIntoHadoopFsRelationCommand(
}
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) {
throw new IOException(s"Unable to clear output " +
s"directory $staticPrefixPath prior to writing to it")
if (fs.exists(staticPrefixPath)) {
if (staticPartitionPrefix.isEmpty && outputCheck) {
// input contain output, only delete output sub files when job commit
val files = fs.listFiles(staticPrefixPath, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are a lot of files here, you've gone from a dir delete which was O(1) on a fileystem, probably O(descendant) on an object store to at O(children) on an FS, O(children * descendants (chlld)) op here. Not significant for a small number of files, but could potentially be expensive. Why do the iteration at all?

while (files.hasNext) {
val file = files.next()
if (!committer.deleteWithJob(fs, file.getPath, false)) {
throw new IOException(s"Unable to clear output " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as committer.deleteWithJob() returns true in base class, that check won't do much, at least not with the default impl. Probably better just to have deleteWithJob() return Unit, require callers to raise an exception on a delete failure. Given that delete() is required to say "dest doesn't exist if you return", I don't think they need to do any checks at all

s"directory ${file.getPath} prior to writing to it")
}
}
} else {
if (!committer.deleteWithJob(fs, staticPrefixPath, true)) {
throw new IOException(s"Unable to clear output " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, hard to see how this exception path would be reached.

s"directory $staticPrefixPath prior to writing to it")
}
}
}
// now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
for ((spec, customLoc) <- customPartitionLocations) {
Expand Down Expand Up @@ -248,4 +265,5 @@ case class InsertIntoHadoopFsRelationCommand(
}
}.toMap
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,16 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
}
}

test("it is not allowed to write to a table while querying it.") {
val message = intercept[AnalysisException] {
sql(
test("allowed to write to a table while querying it.") {
val df = sql(s"SELECT * FROM jsonTable")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jsonTable
""".stripMargin)
}.getMessage
assert(
message.contains("Cannot overwrite a path that is also being read from."),
"INSERT OVERWRITE to a table while querying it should not be allowed.")

checkAnswer(
sql("SELECT * FROM jsonTable"),
df)
}

test("Caching") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("insertInto - source and target are the same table") {
test("insertInto - source and target can be the same table") {
val tableName = "tab1"
withTable(tableName) {
Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)
Expand All @@ -1204,10 +1204,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
table(tableName),
Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))

val e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
}.getMessage
assert(e.contains(s"Cannot overwrite a path that is also being read from"))
table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2))
)
}
}

Expand Down