-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24195][Core] Ignore the files with "local" scheme in SparkContext.addFile #21533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @felixcheung. Please take a look about this when you have time. Thanks. |
|
Test build #91682 has finished for PR 21533 at commit
|
| case null | "local" => new File(path).getCanonicalFile.toURI.toString | ||
| case null | "local" => | ||
| // SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. | ||
| uri = new Path(uri.getPath).toUri |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? Can't we just do new File(uri.getPath).getCanonicalFile.toURI.toString without this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, same question. The above line seems not useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it changes uri - which is referenced again below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just as @felixcheung said, this because we will use uri in https://github.com/apache/spark/pull/21533/files/f922fd8c995164cada4a8b72e92c369a827def16#diff-364713d7776956cb8b0a771e9b62f82dR1557, if the uri with local scheme, we'll get an exception cause local is not a valid scheme for FileSystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we getPath doesn't include scheme:
scala> new Path("local:///a/b/c")
res0: org.apache.hadoop.fs.Path = local:/a/b/c
scala> new Path("local:///a/b/c").toUri
res1: java.net.URI = local:///a/b/c
scala> new Path("local:///a/b/c").toUri.getScheme
res2: String = local
scala> new Path("local:///a/b/c").toUri.getPath
res3: String = /a/b/c
why should we do this again?
scala> new Path(new Path("local:///a/b/c").toUri.getPath).toUri.getPath
res4: String = /a/b/c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea we can simplify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon @jiangxb1987
Thanks for your explain, I think I know what's your meaning about we getPath doesn't include scheme. Actually the purpose of this code uri = new Path(uri.getPath).toUri, is to reassign the var in +1520, we don't want the uri including local scheme.
Can't we just do new File(uri.getPath).getCanonicalFile.toURI.toString without this line?
We can't because like I explained above, if we didn't do uri = new Path(uri.getPath).toUri, will get a exception like below:
No FileSystem for scheme: local
java.io.IOException: No FileSystem for scheme: local
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1830)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:690)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:486)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1557)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, at least we can do:
val a = new File(uri.getPath).getCanonicalFile.toURI.toString
uri = new Path(uri.getPath).toUri
a
new Path(uri.getPath).toUri for trimming the scheme looks not quite clean though. It's a-okay at least to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, thanks. I'll do this in the next commit. Thanks for your patient explain.
|
|
||
| // file and absolute path for path with local scheme | ||
| val file3 = File.createTempFile("someprefix3", "somesuffix3", dir) | ||
| val localPath = "local://" + file3.getParent + "/../" + file3.getParentFile.getName + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use string interpolation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks.
| sc.addFile(file1.getAbsolutePath) | ||
| sc.addFile(relativePath) | ||
| sc.addFile(localPath) | ||
| sc.parallelize(Array(1), 1).map(x => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
map { x =>
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, fix in next commit.
|
Seems fine to me. |
felixcheung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you give some examples of what the output for local:/ looks like in the change of addFile()?
| } | ||
| if (absolutePath2 == gotten2.getAbsolutePath) { | ||
| throw new SparkException("file should have been copied : " + absolutePath2) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not change the existing test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I keep all existing test and just do clean work for reducing common code line by adding a function checkGottenFile in https://github.com/apache/spark/pull/21533/files/f922fd8c995164cada4a8b72e92c369a827def16#diff-8d5858d578a2dda1a2edb0d8cefa4f24R139. If you think it's unnecessary, I just change it back.
| case null | "local" => new File(path).getCanonicalFile.toURI.toString | ||
| case null | "local" => | ||
| // SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. | ||
| uri = new Path(uri.getPath).toUri |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it changes uri - which is referenced again below.
|
Test build #91780 has finished for PR 21533 at commit
|
|
LGTM except for the comment from @HyukjinKwon |
|
Just take another look on this issue. I think the fix is just to make it work, but not make it work correctly. The fix here actually treats scheme "local" to "file", actually they're different in Spark. In Spark "local" scheme means resources are already on the driver/executor nodes, which means Spark doesn't need to ship resources from driver to executors via fileserver. But here it treats as "file" which will be shipped via fileserver to executors. This is semantically not correct. I think for "local" scheme, the fix should:
|
|
Exactly I agree with ^. It should be the best to implement that logic. I thought it seems we have never implemented "local" logic as described so far. So, I thought it's kind of okay. |
|
"local" scheme was supported long ago for users who already deploy jars on every node. HDI heavily uses this feature. |
|
Ah, then maybe I missed some histories in the transitive changes. |
|
@jerryshao Great thanks for your review and detailed explain, based on your guidance, I found the behavior about the file in |
|
Test build #92001 has finished for PR 21533 at commit
|
|
retest this please. |
|
Test build #92027 has finished for PR 21533 at commit
|
| val tmpPath = new File(uri.getPath).getCanonicalFile.toURI.toString | ||
| // SPARK-24195: Local is not a valid scheme for FileSystem, we should only keep path here. | ||
| uri = new Path(uri.getPath).toUri | ||
| tmpPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the change here make file with "local" scheme a no-op. This makes me think whether supporting "local" scheme in addFile is meaningful or not? Because file with "local" scheme is already existed on every node, also it should be aware by the user, so adding it seems not meaingful.
By looking at the similar method addJar, there "local" jar is properly treated without adding to fileServer, and properly convert to the right scheme used by classloader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think whether supporting "local" scheme in addFile is meaningful or not? Because file with "local" scheme is already existed on every node, also it should be aware by the user, so adding it seems not meaingful.
Yeah, agree with you. The last change wants to treat "local" file without adding to fileServer and correct its scheme to "file:", but maybe add a local file, the behavior itself is a no-op? So we just forbidden user pass a file with "local" scheme in addFile?
|
Test build #92379 has finished for PR 21533 at commit
|
| case "local" => | ||
| logWarning("We do not support add a local file here because file with local scheme is " + | ||
| "already existed on every node, there is no need to call addFile to add it again. " + | ||
| "(See more discussion about this in SPARK-24195.)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please rephrase to "File with 'local' scheme is not supported to add to file server, since it is already available on every node."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, rephrase done.
|
I think maybe we could:
@jiangxb1987 @vanzin what's your option? |
|
Test build #92410 has finished for PR 21533 at commit
|
|
retest this please. |
|
Test build #92415 has finished for PR 21533 at commit
|
|
Test build #4219 has finished for PR 21533 at commit
|
|
Test build #4220 has finished for PR 21533 at commit
|
|
retest this please. |
|
Please also update the title and PR description because we changed the proposed solution in the middle. |
|
Test build #93259 has finished for PR 21533 at commit
|
|
@jiangxb1987 Thanks for reminding, rephrase done. |
jerryshao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @jiangxb1987 WDYT?
|
lgtm too |
|
SGTM too |
|
Merging to master branch. Thanks all! |
|
Thanks everyone for your help! |
What changes were proposed in this pull request?
In Spark "local" scheme means resources are already on the driver/executor nodes, this pr ignore the files with "local" scheme in
SparkContext.addFilefor fixing potential bug.How was this patch tested?
Existing tests.