Skip to content

Conversation

@yaooqinn
Copy link
Member

What changes were proposed in this pull request?

the noop reduce id used to be 0, which results a fixed index or data file name. When the nodes in your cluster has more than one disks, if one of them is broken and the fixed file name's hash code % disk number just point to it, all attempts of one task will inevitably access the broken disk, which lead to meaningless task failure or even tear down the whole job. Here we change the noop reduce id to task attempt number to produce different file name, which may try another healthy disk.

Why are the changes needed?

We have an HDFS/YARN cluster with about 2k~3k nodes, each node has 8T * 12 local disks for storage and shuffle. Sometimes, one or more disks get into bad status during computations. Sometimes it does cause job level failure, sometimes does.

The following picture shows one failure job caused by 4 task attempts were all delivered to the same node and failed with almost the same exception for writing the index temporary file to the same bad disk.
This is caused by two reasons:

  1. As we can see in the figure the data and the node have the best data locality for reading from HDFS. As the default spark.locality.wait(3s) taking effect, there is a high probability that those attempts will be scheduled to this node.
  2. The index file or data file name for a particular shuffle map task is fixed. It is formed by the shuffle id, the map id and the noop reduce id which is always 0. The root local dir is picked by the fixed file name's non-negative hash code % the disk number. Thus, this value is also fixed. Even when we have 12 disks in total and only one of them is broken, if the broken one is once picked, all the following attempts of this task will inevitably pick the broken one.

image

Does this PR introduce any user-facing change?

NO

How was this patch tested?

add some uts

ping @cloud-fan @srowen @squito

@cloud-fan
Copy link
Contributor

Can you give more details on the design? It seems to me that the shuffle reader and writer need to work together to support multi-disks well. I don't understand how the noop reducer id is related to it.

@yaooqinn
Copy link
Member Author

@cloud-fan you're right. this will break the dependency of the reader and writer. I will try to work out a proper solution.

@yaooqinn yaooqinn changed the title [SPARK-29257][Core][Shuffle] Use task attempt number as noop reduce id to handle disk failures during shuffle [WIP][SPARK-29257][Core][Shuffle] Use task attempt number as noop reduce id to handle disk failures during shuffle Sep 26, 2019
@turboFei
Copy link
Member

turboFei commented Sep 26, 2019

About this issue, I propose my thought.

This shuffleIndexFile is kept for per executor to store partition lengths.

And when ESS is enabled, it will be read by ESS.

I think we can define a parameter, such as spark.shuffle.index.maxRetry(just make sense) to control the max retry to get a NoOPReduceId to avoid located to same bad disk every time.

Its default value can be the num of localDirs.

We can define a inner class in IndexShuffleResolver and use an atomicInteger to present the value of NOOPReduceID.

When failed get IndexFile, we increamentAndGet a new NOOPReduceId, which can not exceed spark.shuffle.index.maxRetry.

So the name of index file is shuffleId-MapId-retriedNoOPReduceId.

And when read data from indexFile, we should try to read indexFile named shuffleId-mapId-0, if fail, fall back to read shuffleId-mapId-1 , until shuffleId-mapId-${maxRetry-1}.

@turboFei
Copy link
Member

Oh, there is a question that how to transfer this parameter to ESS.

@SparkQA
Copy link

SparkQA commented Sep 26, 2019

Test build #111414 has finished for PR 25941 at commit f8485cc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2019

Test build #111413 has finished for PR 25941 at commit 1acb0d9.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

if fail, fall back to read shuffleId-mapId-1 , until shuffleId-mapId-${maxRetry-1}.

How should the shuffle writer write these files? Do you mean we need to slow down shuffle writing to write duplicated files?

@yaooqinn
Copy link
Member Author

I prefer we propagate attemptNumber with map statuses
The writer can produce “shuffle_$shuffleId_$mapId_$attemptNumber_0.index(data)”. For both external and internal shuffle services, it can use (shuffleId, mapId, attemptNumber) to identify them.
Besides, in most cases attemptNumber is default to 0, we can only track those non-zero ones.

@cloud-fan
Copy link
Contributor

This is a high-level question: How can we be tolerant of disk failures without data duplication? If so, what's the design here to duplicate shuffle files?

@yaooqinn
Copy link
Member Author

yaooqinn commented Sep 26, 2019

We will not create duplicate shuffle files at all. The unsuccess attempts may have a chance to create or pick a subdirectory under the "blockmgr-xxx" before disk failures on it, but they can not commit the index and data files because of the disk failure. The “shuffle_$shuffleId_$mapId_$attemptNumber_0.index” and “shuffle_$shuffleId_$mapId_$attemptNumber_0.data” may locate in different disks, and any of them meets the bad disk will abort the write process and fail the task attempt. Only the success task can create those files.

In another case, if the disk failure happens in shuffle read phase, which may cause fetch failed exception and re-run the dependent map taskes, but I guess for those duplicated files, our processing logic will not change.

@cloud-fan
Copy link
Contributor

ok I get your point now. Please notice that, after #25620 , mapId becomes unique, so you would get different file name when retry the task,

@yaooqinn
Copy link
Member Author

@cloud-fan, Thanks for noticing me #25620, the unique taskAttemptId can produce the same effect with $mapId_$attemptNumber. So I guess this problem has been handled in the master branch.

@yaooqinn
Copy link
Member Author

closing this because it isn't a problem anymore.

@yaooqinn yaooqinn closed this Sep 27, 2019
@yaooqinn yaooqinn deleted the SPARK-29257 branch September 27, 2019 02:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants