-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18024][SQL] Introduce an internal commit protocol API #15696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @ericl |
|
Test build #67818 has finished for PR 15696 at commit
|
|
Test build #67832 has finished for PR 15696 at commit
|
|
Test build #67835 has finished for PR 15696 at commit
|
|
|
||
| override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { | ||
| new Path(stagingDir, fileNamePrefix) | ||
| new Path(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ extension?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope -- no more extension coming from Hadoop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is now specified explicitly in OutputWriterFactory.getFileExtension)
|
Test build #67838 has finished for PR 15696 at commit
|
|
Test build #67840 has finished for PR 15696 at commit
|
|
|
||
|
|
||
| /** | ||
| * An interface to define how a Spark job commits its outputs. Implementations must be serializable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add: since the same committer instance setup on the driver will be used for tasks.
| * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest | ||
| * are left to the commit protocol implementation to decide. | ||
| */ | ||
| def addTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/add/new?
|
|
||
| /** | ||
| * Notifies the commit protocol to add a new file, and gets back the full path that should be | ||
| * used. Must be called on the executors when running tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add: Note that the returned temp file may have an arbitrary path. The commit protocol only promises that the file will be at the location specified by the arguments after job commit.
| /** | ||
| * Aborts a task after the writes have failed. Must be called on the executors when running tasks. | ||
| */ | ||
| def abortTask(taskContext: TaskAttemptContext): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this also best-effort?
| * | ||
| * Unlike Hadoop's OutputCommitter, this implementation is serializable. | ||
| */ | ||
| class MapReduceFileCommitterProtocol(path: String, isAppend: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call this HadoopCommitProtocolWrapper or something to be more clear?
| final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = { | ||
| val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") | ||
| f"part-r-$split%05d-$uuid$bucketString" | ||
| f"part-$split%05d-$uuid$bucketString" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still used?
|
Test build #67841 has finished for PR 15696 at commit
|
|
Test build #67852 has finished for PR 15696 at commit
|
|
Closing this in favor of #15707 |
What changes were proposed in this pull request?
This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits.
How was this patch tested?
Should be covered by existing write tests.