Skip to content

Conversation

@zheh12
Copy link

@zheh12 zheh12 commented May 7, 2018

What changes were proposed in this pull request?

When insert overwrite a parquet table. There is a check.

 if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)

The check throws exception if output path tries to overwrite the same input path. This check(limitation) only exists in datasource table but not hive table. Shall we remove this check?

We cannot read and overwrite a HadoopFsRelation with the same path -- input and output should be different. The reason is that spark deletes the output partition path before reading.
This pr proposes to mark/cache the paths(to delete) before reading. And postpone deletion when commit job.

How was this patch tested?

I just udpated InsertSuite and MetastoreDataSourceSuite.

@zheh12 zheh12 changed the title [SPARK-24194] HadoopFsRelation cannot overwrite a path that is also b… [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a path that is also b… May 7, 2018
@zheh12
Copy link
Author

zheh12 commented May 7, 2018

cc @cloud-fan @jiangxb1987
The core idea is to cache the partitions paths and delete them when commit job. Thus we can have the same input and output path. Is there some drawbacks for this idea? Please give some advice when you have time

@zheh12 zheh12 changed the title [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a path that is also b… [SPARK-24194] [SQL]HadoopFsRelation Cannot overwrite a path that is also being read from. May 7, 2018
@zheh12 zheh12 changed the title [SPARK-24194] [SQL]HadoopFsRelation Cannot overwrite a path that is also being read from. [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a path that is also being read from. May 7, 2018
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this deletes leaf files one by one, have you evaluated the performance difference?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, if it is the root directory of the table, I must record all the files in the directory, and wait until the job is commited to delete. Because the _temporary of the entire job is also in the directory, I cannot directly delete the entire directory.

Second, when we record the files that need to be deleted, we just list the files in the root directory non-recursively. Under normal circumstances, the number of files in the first-level directory of the partition table will not be too much.

In the end, this will certainly be slower than directly deleting the entire directory, but under the current implementation, we cannot directly delete the entire table directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to only do it with overwrite? we should not introduce perf regression when not necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you considered the approach taken by dynamicPartitionOverwrite? i.e. using staging directory.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. From the code point of view, the current implementation is deleteMatchingPartitions happend only if overwrite is specified.
  2. Using dynamicPartitionOverwrite will not solve this problem,because it will also generate a .stage directory under the table root directory. We still need to record all the files we want to delete, but we cannot directly delete the root directories.
    The dynamic partition overwrite is actually recording all the partitions that need to be deleted and then deleted one by one. And the entire table overwrite deletes all the data of the entire directory, it needs to record all deleted partition directory files,so in fact the implementation of the code is similar with dynamicPartitionOverwrite .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do this, when the job is committed, it will delete the entire output directory. And there will be no data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will delete files just before committing job, do we?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key is that the data is already in the output directory before committing job, and we can't delete the output directory anymore.

We overloaded FileCommitProtocol in the HadoopMapReduceCommitProtocol with the deleteWithJob method. Now it will not delete the file immediately, but it will wait until the entire job is committed.

We did delete the files with committed the job, but the temporary output files were generated when the task was started. These temporary output files are in the output directory. And the data will be move out to the output directory.

After the job starts, there is no safe time to delete the entire output directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then how about adding a new parameter canDeleteNow: Boolean to FileCommitProtocol.deleteWithJob?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. I change my code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this code.

@cloud-fan
Copy link
Contributor

cc @ericl

@zheh12 zheh12 force-pushed the SPARK-24194 branch 2 times, most recently from 19e6692 to a51620b Compare May 9, 2018 05:44
@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented May 9, 2018

Test build #90400 has finished for PR 21257 at commit a51620b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zheh12
Copy link
Author

zheh12 commented May 9, 2018

Jenkins, retest this please.

@zheh12
Copy link
Author

zheh12 commented May 9, 2018

cc @cloud-fan, Jenkins has some error, please help me retest, thanks

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented May 9, 2018

Test build #90413 has finished for PR 21257 at commit a51620b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the recursive is always passed as true? can we remove it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current situation we can delete it, but I feel it better to use a default value true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any (potential) cases we need a recursive parameter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the recursive parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be different from stagingDir.getFileSystem(jobContext.getConfiguration)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StagingDir is not always be valid hadoop path, but the JobContext work dir always be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change other places in this method to use the fs created here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure you can guarantee that the working dir is always the dest FS. At least with @rdblue's committers, task attempts work dirs are in file:// & task commit (somehow) gets them to the destFS in a form where job commit will make them visible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change my code.
I now record every FileSystem will delete the path with a map structure. And Don't assume that they will use the same FileSystem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isReadPath?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isInReadPath or inReadPath or isReadPath better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd personally ignore a failure on delete(), as the conditions for the API call are "if this doesn't raise an exception then the dest is gone". You can skip the exists check as it will be superfluous

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90593 has finished for PR 21257 at commit 273b1af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90619 has finished for PR 21257 at commit 6821795.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90632 has finished for PR 21257 at commit 6821795.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you don't need to do the exists check, it's just overhead. delete() will return false if there was nothing to delete.
  2. But...what if that delete throws an exception? Should the commit fail (as it does now?), or downgraded. As an example, the hadoop FileOutputCommtter uses the option "mapreduce.fileoutputcommitter.cleanup-failures.ignored to choose what to do there
  3. ...and: what about cleanup in an abort job?

I think you'd be best off isolating this cleanup into its own method and call from both job commit & job abort, in job commit discuss with others what to do, and in job abort just log & continue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not delete the data when the task is aborted. The semantics of
descriptionWithJob should be to delete the data when the Job is commited.
I change code for handling exceptions.

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90636 has finished for PR 21257 at commit 803b0a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 19, 2018

Test build #90826 has finished for PR 21257 at commit 6d496b1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Update the PR title?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: style issue. Can we follow the indents in the method declaration? https://github.com/databricks/scala-style-guide#spacing-and-indentation

@SparkQA
Copy link

SparkQA commented May 22, 2018

Test build #90957 has finished for PR 21257 at commit f4f329c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92165 has finished for PR 21257 at commit 89599e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

for (path <- pathsToDelete(fs)) {
try {
if (!fs.delete(path, true)) {
logWarning(s"Delete path ${path} fail at job commit time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete -> false just means there was nothing there, I wouldn't warn at that point. Unless delete() throws an exception you assume that when the call returns, fs.exists(path) does not hold -regardless of the return value. (Special exception, the dest is "/")

} catch {
case ex: IOException =>
throw new IOException(s"Unable to clear output " +
s"file ${path} at job commit time", ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend including ex.toString() in the new exception raised, as child exception text can often get lost

while (files.hasNext) {
val file = files.next()
if (!committer.deleteWithJob(fs, file.getPath, false)) {
throw new IOException(s"Unable to clear output " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as committer.deleteWithJob() returns true in base class, that check won't do much, at least not with the default impl. Probably better just to have deleteWithJob() return Unit, require callers to raise an exception on a delete failure. Given that delete() is required to say "dest doesn't exist if you return", I don't think they need to do any checks at all

if (fs.exists(staticPrefixPath)) {
if (staticPartitionPrefix.isEmpty && outputCheck) {
// input contain output, only delete output sub files when job commit
val files = fs.listFiles(staticPrefixPath, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are a lot of files here, you've gone from a dir delete which was O(1) on a fileystem, probably O(descendant) on an object store to at O(children) on an FS, O(children * descendants (chlld)) op here. Not significant for a small number of files, but could potentially be expensive. Why do the iteration at all?

}
} else {
if (!committer.deleteWithJob(fs, staticPrefixPath, true)) {
throw new IOException(s"Unable to clear output " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, hard to see how this exception path would be reached.

/**
* now just record the file to be delete
*/
override def deleteWithJob(fs: FileSystem, path: Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to worry about concurrent access here, correct?

@steveloughran
Copy link
Contributor

some overall thought

  • I think this is only happening on a successful job commit, not abort. This is the desired action?
  • if something goes wrong here, is failing the entire job the correct action? If the deletes were happening earlier, then yes, the job would obviously fail. But now the core work has taken place, it's just cleanup failing. Which could be: permissions, transient network, etc.

I'll have to look a bit closer at what happens in committer cleanups right now, though as they are focused on rm -f $dest/__temporary/$jobAttempt, they are less worried about failures here as it shoudn't be changing any public datasets

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

ping @zheh12 to address comments. I am going to suggest to close this for now while I am identifying PRs to close now.

@asfgit asfgit closed this in a3ba3a8 Nov 11, 2018
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Closes apache#21766
Closes apache#21679
Closes apache#21161
Closes apache#20846
Closes apache#19434
Closes apache#18080
Closes apache#17648
Closes apache#17169

Add:
Closes apache#22813
Closes apache#21994
Closes apache#22005
Closes apache#22463

Add:
Closes apache#15899

Add:
Closes apache#22539
Closes apache#21868
Closes apache#21514
Closes apache#21402
Closes apache#21322
Closes apache#21257
Closes apache#20163
Closes apache#19691
Closes apache#18697
Closes apache#18636
Closes apache#17176

Closes apache#23001 from wangyum/CloseStalePRs.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants