Skip to content

Commit 4296612

Browse files
committed
Tue Nov 8 20:04:12 PST 2016
1 parent fbd7b42 commit 4296612

File tree

3 files changed

+11
-2
lines changed

3 files changed

+11
-2
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,18 @@ abstract class FileCommitProtocol {
8282
*
8383
* The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
8484
* are left to the commit protocol implementation to decide.
85+
*
86+
* Important: it is the caller's responsibility to add uniquely identifying content to "ext"
87+
* if a task is going to write out multiple files to the same dir.
8588
*/
8689
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
8790

8891
/**
8992
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
9093
* Depending on the implementation, there may be weaker guarantees around adding files this way.
94+
*
95+
* Important: it is the caller's responsibility to add uniquely identifying content to "ext"
96+
* if a task is going to write out multiple files at all (even to different directories).
9197
*/
9298
def newTaskTempFileAbsPath(
9399
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,10 @@ object FileFormatWriter extends Logging {
321321
None
322322
}
323323
val path = if (customPath.isDefined) {
324+
// We need to include a uuid here since the commit protocol does not guarantee that
325+
// temp files requested by the same task for absolute placement do not collide.
324326
committer.newTaskTempFileAbsPath(
325-
taskAttemptContext, customPath.get, java.util.UUID.randomUUID().toString + ext)
327+
taskAttemptContext, customPath.get, UUID.randomUUID().toString + ext)
326328
} else {
327329
committer.newTaskTempFile(taskAttemptContext, partDir, ext)
328330
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ class PartitionProviderCompatibilitySuite
191191
*/
192192

193193
/**
194-
* Runs a test against a multi-level partitioned table, and validate some post-test invariants.
194+
* Runs a test against a multi-level partitioned table, then validates that the custom locations
195+
* were respected by the output writer.
195196
*
196197
* The initial partitioning structure is:
197198
* /p1=0/p2=0 -- custom location a

0 commit comments

Comments
 (0)