-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25299] Add shuffle map output un-registration hooks upon fetch failure #609
Conversation
…e map outputs or not
This reverts commit 5abde44.
@@ -36,31 +35,8 @@ class PluginShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO { | |||
|
|||
class PluginShuffleDriverComponents(delegate: ShuffleDriverComponents) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to write more comprehensive tests with a particular implementation of checking for persisted files.
@@ -1673,8 +1673,11 @@ private[spark] class DAGScheduler( | |||
// TODO: mark the executor as failed only if there were lots of fetch failures on it | |||
if (bmAddress != null) { | |||
if (bmAddress.executorId == null) { | |||
if (shuffleDriverComponents.unregistrationStrategyOnFetchFailure() == | |||
MapOutputUnregistrationStrategy.HOST) { | |||
if (unRegisterOutputOnHostOnFetchFailure && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's carefully consider if this is correct - not entirely sure, to be honest.
@@ -71,6 +72,7 @@ class SparkEnv ( | |||
val metricsSystem: MetricsSystem, | |||
val memoryManager: MemoryManager, | |||
val outputCommitCoordinator: OutputCommitCoordinator, | |||
val shuffleDataIo: ShuffleDataIO, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it weird to have the shuffleDataIO
as part of the developer API? My concern is that users could use shuffleDAtaIO.driver()
in the executor and vice versa, but maybe it's not a problem.
If you wanted to get around this, you could pass the driver components, or pass in a function, as part of the method call to MapOutputTracker since you already a driver components in the DAGScheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to pass functional modules around through method calls - modules should be dependency injected at construction time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rationale for not passing functional modules through method calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ownership of the module becomes unclear - the dependency tree should be more or less static and clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to eventually have ShuffleDataIO
in this API, but it'll be tricky when it comes to upstream Spark because our APIs will still be in experimental status for now - ok with deferring to the community to see what they think here, cause I can't think of much better at all to set up the dependency injection.
We realized that there's complexity with whether or not map outputs should be unregistered, and thus should be recomputed.
Previously, we were using a boolean, then, a three-way switch. But these modes do not capture a lot of intricacies with how this should work - in particular, for our async upload proof of concept, we end up unregistering all the map outputs written by an executor, despite the fact that this would invalidate and re-write all the map outputs that were persisted to the remote storage system.