Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files #2373

Open
wants to merge 36 commits into
base: main
Choose a base branch
from

Conversation

wangshengjie123
Copy link
Contributor

What changes were proposed in this pull request?

Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions

Why are the changes needed?

Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue

Does this PR introduce any user-facing change?

No

How was this patch tested?

Cluster test and uts

@wangshengjie123 wangshengjie123 changed the title [CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files [WIP][CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files Mar 11, 2024
Copy link
Contributor

@waitinfuture waitinfuture left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?


int step = locations.length / subPartitionSize;

// if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the logic should be like this:

    // if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
    // task 0: 1, 4, 7, 10
    // task 1: 2, 4, 8
    // task 2: 3, 5, 9
    for (int i = 0; i < step + 1; i++) {
      int index = i * step + subPartitionIndex;
      if (index < locations.length) {
        result.add(orderedPartitionLocations[index]);
      }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, the idea is to minimize per row size - and so why column 0 goes "down" the array index, while column 1 goes "up" - and keeps alternating - so that as the size keeps increasing, it is more reasonably distributed for each row (essentially a way to approximate multi-way partition problem).

The result would be different for the formulation above @waitinfuture.

For example:

partition sizes: {1000, 1100, 1300, 1400, 2000, 2500, 3000, 10000, 20000, 25000, 28000, 30000}
subPartitionSize == 3
subPartitionIndex == 1

In formulation from PR we have:

task 0: 1000 , 2500 , 3000 , 30000
task 1: 1100 , 2000 , 10000 , 28000
task 2: 1300 , 1400 , 20000 , 25000

So the sizes will be:
task 0: 36500
task 1: 41100
task 2: 47700

As formulated above, we will end up with:

task 0: 1000 , 1400 , 3000 , 25000
task 1: 1100 , 2000 , 10000 , 28000
task 2: 1300 , 2500 , 20000 , 30000

In this case, the sizes will be:
task 0: 30400
task 1: 41100
task 2: 53800

Personally, I would have looked into either largest remainder or knapsack heuristic (given we are sorting anyway).

(Do let me know if I am missing something here @wangshengjie123)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm Sorry for late reply, your understanding is correct, and i should optimize the logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mridulm for the explanation, I actually didn't get the idea and was thinking the naive way :)

common/src/main/proto/TransportMessages.proto Outdated Show resolved Hide resolved
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting work @wangshengjie123 !


int step = locations.length / subPartitionSize;

// if partition location is [1,2,3,4,5,6,7,8,9,10], and skew partition split to 3 task:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not wrong, the idea is to minimize per row size - and so why column 0 goes "down" the array index, while column 1 goes "up" - and keeps alternating - so that as the size keeps increasing, it is more reasonably distributed for each row (essentially a way to approximate multi-way partition problem).

The result would be different for the formulation above @waitinfuture.

For example:

partition sizes: {1000, 1100, 1300, 1400, 2000, 2500, 3000, 10000, 20000, 25000, 28000, 30000}
subPartitionSize == 3
subPartitionIndex == 1

In formulation from PR we have:

task 0: 1000 , 2500 , 3000 , 30000
task 1: 1100 , 2000 , 10000 , 28000
task 2: 1300 , 1400 , 20000 , 25000

So the sizes will be:
task 0: 36500
task 1: 41100
task 2: 47700

As formulated above, we will end up with:

task 0: 1000 , 1400 , 3000 , 25000
task 1: 1100 , 2000 , 10000 , 28000
task 2: 1300 , 2500 , 20000 , 30000

In this case, the sizes will be:
task 0: 30400
task 1: 41100
task 2: 53800

Personally, I would have looked into either largest remainder or knapsack heuristic (given we are sorting anyway).

(Do let me know if I am missing something here @wangshengjie123)

@cfmcgrady
Copy link
Contributor

Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?

HI, @wangshengjie123
Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!

@wangshengjie123
Copy link
Contributor Author

Thanks @wangshengjie123 for this PR! I left some comments. In addition, is the small change to Spark missing?

HI, @wangshengjie123 Can you please update the Spark patch? It will help the reviewers understand this PR better. Thanks!

Sorry for late reply, the pr will be updated today or tomorrow

@wangshengjie123 wangshengjie123 force-pushed the optimize-skew-partition branch from b3af836 to 599be24 Compare March 16, 2024 08:14
Copy link

codecov bot commented Mar 16, 2024

Codecov Report

Attention: Patch coverage is 1.20482% with 82 lines in your changes are missing coverage. Please review.

Project coverage is 48.51%. Comparing base (12c3779) to head (ef81070).
Report is 12 commits behind head on main.

Files Patch % Lines
...born/common/protocol/message/ControlMessages.scala 0.00% 38 Missing ⚠️
.../apache/celeborn/common/write/PushFailedBatch.java 0.00% 24 Missing ⚠️
...org/apache/celeborn/common/util/PbSerDeUtils.scala 0.00% 9 Missing ⚠️
...g/apache/celeborn/common/protocol/StorageInfo.java 0.00% 6 Missing ⚠️
...va/org/apache/celeborn/common/write/PushState.java 16.67% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2373      +/-   ##
==========================================
- Coverage   48.77%   48.51%   -0.26%     
==========================================
  Files         209      210       +1     
  Lines       13109    13186      +77     
  Branches     1134     1139       +5     
==========================================
+ Hits         6393     6396       +3     
- Misses       6294     6368      +74     
  Partials      422      422              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wangshengjie123 nice pr! Another suggestion is better to add UT for this feature.

@@ -1393,7 +1414,13 @@ public void onSuccess(ByteBuffer response) {
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));

if (dataPushFailureTrackingEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for HARD_SPLIT to do this. as worker never write the batch when HARD_SPLIT. cc @waitinfuture

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's possible that the master copy succeeds but the copy fails due to HARD_SPLIT. I will check it again

@@ -615,6 +663,17 @@ private boolean fillBuffer() throws IOException {

// de-duplicate
if (attemptId == attempts[mapId]) {
if (splitSkewPartitionWithoutMapRange) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We can reuse one PushFailedBatch object and update inner fields to improve memory-efficient.
  2. Better to check failedBatches is empty or not first. May be we never need to check failed batches.

Copy link
Contributor Author

@wangshengjie123 wangshengjie123 Mar 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. get this
  2. fixed to avid NPE

@@ -4671,4 +4671,13 @@ object CelebornConf extends Logging {
.version("0.5.0")
.intConf
.createWithDefault(10000)

val CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED: ConfigEntry[Boolean] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we can use another configuration name for enable optimize skew join. The CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED doesn't feel so straightforward.

@wangshengjie123
Copy link
Contributor Author

Thanks @wangshengjie123 nice pr! Another suggestion is better to add UT for this feature.

UTs is doing, test in cluster this week, uts will be submit later

@wangshengjie123 wangshengjie123 force-pushed the optimize-skew-partition branch 2 times, most recently from 8fe3a13 to 12eca26 Compare March 26, 2024 14:12
@s0nskar
Copy link
Contributor

s0nskar commented Apr 4, 2024

@wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.

@s0nskar
Copy link
Contributor

s0nskar commented Apr 4, 2024

From my understanding, in this PR we're diverting from vanilla spark approach based on mapIndex and just dividing the full partition into multiple sub-partition based on some heuristics. I'm new to Celeborn code, so might be missing something basic but in this PR we're not addressing below issue. If we consider a basic scenario where a partial partition read is happening and we see a FetchFailure.

ShuffleMapStage --> ResultStage

  • ShuffleMapStage (attempt 0) generated [P0, P1, P2] and P0 is skewed with partition location [0,1,2,3,4,5].
  • AQE asks for three splits and this PR logic will create three partitions [0, 1], [2, 3], [4, 5]
  • Now consider is reducer read [0, 1] and [2, 3] and gets FetchFailure while reading [4, 5]
  • This will trigger a complete mapper stage retry a/c to this doc and will clear the map output corresponding the shuffleID
  • ShuffleMapStage (attempt 0) will again generate data for P0 at different partition location [a, b, c, d, e, f] and it will get divided like [a, b], [c, d], [e, f]
  • Now if reader stage is ShuffleMapStage then it will read every sub-partition again but if the reader is ResultStage then it will only read missing partition data which [e, f].

The data generated on location 1 and location a would be different because of other factors like network delay (same thing applies for other locations). Ex – The data that might be present in 1st location in first attempt might be present in 2nd location or any location in different attempt because of the order mapper generated the data and in order server received that data.

This can cause both Data loss and Data duplication, this might be getting addressed in some other place in the codebase that i'm not aware of but i wanted point this problem out.

@pan3793
Copy link
Member

pan3793 commented Apr 4, 2024

@s0nskar Good point, this should be an issue for ResultStage, even though the ShuffleMapStage's output is deterministic.

IIRC, vanilla Spark also has some limitations on stage retry cases for ResultStage when ShuffleMapStage's output is indeterministic, for such cases, we need to fail the job, right?

@s0nskar
Copy link
Contributor

s0nskar commented Apr 4, 2024

@pan3793 This does not become problem if we are maintaining the concept of mapIndex ranges as spark will always read deterministic output for each sub-partition.

As vanilla spark always read deterministic output because of mapIndex range filter, it will not face this issue. In this approach sub-partitions data will be indeterministic across stage attempts. Failing would be only option for such cases until spark start supporting ResultStage rollback.

@s0nskar
Copy link
Contributor

s0nskar commented Apr 4, 2024

Also, I think this issue would not be only limited to ResultStage, this can happen with ShuffleMapStage as well in some complex cases. Consider another scenario –

ShuffleMapStage1 -----> ShuffleMapStage2 ----->

  • Similar to above example, let's say partition skew P0 generated by ShuffleMapStage1.
  • ShuffleMapStage2 gets FetchFailure while reading sub-partitions of ShuffleMapStage1.
  • ShuffleMapStage1 will be recomputed and shuffle outputs will be cleared.
  • Only missing task of ShuffleMapStage2 will be retries, again causing the same issue.

This is case though, we can rollback the whole lineage till this point instead of failing this job. Similar to what vanilla spark does, what this will be very expensive.

@pan3793
Copy link
Member

pan3793 commented Apr 4, 2024

@s0nskar I see your point. When consuming skew partitions, we should always treat the previous ShuffleMapStage's output as indeterministic under the current approach to avoid correctness issues.

@waitinfuture
Copy link
Contributor

waitinfuture commented Apr 4, 2024

Hi @s0nskar , thanks for your point, I think you are correct. Seems this PR conflicts with stage rerun.

we should always treat the previous ShuffleMapStage's output as indeterministic under the current approach to avoid correctness issues.

@pan3793 Is it possible to force make it as indeterministic?

Also, I think Spark doesn't correctly set stage's determinism for some cases, for example a row_number window operator followed by aggregation keyed by the row_number.

cc @mridulm @ErikFang

@waitinfuture
Copy link
Contributor

waitinfuture commented Apr 4, 2024

@wangshengjie123 Is there any doc or ticket explaining this approach? Also for the sort based approach that you mentioned.

The sort based approach is roughly like this:

  1. Each sub reducer reads from all partition splits of its partitionId for data within its map range
  2. The first read request will trigger the partition split file to be sorted based on map ids, so each IO will be sequential

image
image

@s0nskar
Copy link
Contributor

s0nskar commented Apr 4, 2024

Thanks a lot @waitinfuture for the sort based approach description.

Is it possible to force make it as indeterministic?

IMO this would be very difficult to do it from Celeborn itself but it can be done by putting a patch in the Spark code. ShuffledRowRDD can set Determinacy Level to INDETEMINATE if partial partition reads are happening and Celeborn is getting is used.

cc: @mridulm for viz

@pan3793
Copy link
Member

pan3793 commented Apr 4, 2024

@waitinfuture It seems this PR is getting attention, some discussions happened offline, we'd better update the PR description(or Google Docs) to summarize the whole design and known issues so far

}
- PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
+ if (splitSkewPartitionWithCeleborn) {
+ PartialReducerPartitionSpec(reducerId, mapStartIndices.length, i, dataSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can maybe add a note here that these dataSize will not be accurate. Even though in the current downstream code, we're only getting the sum of dataSize which should be equal but someone might be using these differently.

@mridulm
Copy link
Contributor

mridulm commented Apr 6, 2024

It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so).
With that in place, this would not be an issue ...
(I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)

@waitinfuture
Copy link
Contributor

It has been a while since I looked at this PR - but as formulated, the split into subranges is deterministic (if it is not, it should be made so). With that in place, this would not be an issue ... (I will take a deeper look later next week, but do let me know if I am missing something so that I can add that to my analysis)

the split into subranges is deterministic

The way Celeborn splits partition is not deterministic with stage rerun, for example any push failure will cause split, so I'm afraid this statement does not hold...

@mridulm
Copy link
Contributor

mridulm commented Apr 6, 2024

Ah, I see what you mean ... PartitionLocation would change between retries.
Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207

I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.

@mridulm
Copy link
Contributor

mridulm commented Apr 6, 2024

+CC @otterc as well.

@waitinfuture
Copy link
Contributor

Ah, I see what you mean ... PartitionLocation would change between retries. Yeah, this is a problem then - it will cause data loss. This would be a variant of SPARK-23207

I will need to relook at the PR, and how it interact with Celeborn - but if scenarios directly described in SPARK-23207 (or variants of it) are applicable (and we cant mitigate it), we should not proceed down this path given the correctness implications unfortunately.

Maybe we can remain both this optimization and stage rerun, but only allows one to take effect by checking configs for now. The performance issue this PR solves does happen in production.

@FMX
Copy link
Contributor

FMX commented Dec 27, 2024

Changing the celeborn.worker.replicate.randomConnection.enabled to true can ensure that primary and replica shuffle files are identical. The cost of reusing the same transport client may be better than stage rerun.

@FMX FMX closed this Dec 27, 2024
@FMX FMX reopened this Dec 27, 2024
@@ -1531,6 +1548,12 @@ public void onSuccess(ByteBuffer response) {
pushState.onSuccess(hostPort);
callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.SOFT_SPLIT.getValue()}));
} else {
if (dataPushFailureTrackingEnabled) {
for (int i = 0; i < numBatches; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only need add batchesNeedResubmit to FailedBatch~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, will fix and push later

@wangshengjie123
Copy link
Contributor Author

Changing the celeborn.worker.replicate.randomConnection.enabled to true can ensure that primary and replica shuffle files are identical. The cost of reusing the same transport client may be better than stage rerun.

@FMX thanks, did you mean change the celeborn.worker.replicate.randomConnection.enabled to false? So repicate will use the same client to pus replicate data. I will check later

@wangshengjie123
Copy link
Contributor Author

Thanks for continuing to work on this @wangshengjie123, really appreciate it ! I will try to go over the PR during the holidays.

One general comment though ... It has been a while, so I am not very clear on some of the details about this PR. Having said that, I would suggest to minimize changes to Spark directly in order to add support for this PR - if we can find ways to do so. Some of the changes proposed in the diffs, at first read, look a bit disruptive - and are subject to arbitrary failures as spark evolves: even if technically we could justify their correctness right now, they are subject to being arbitrarily impacted as spark evolves - as well as within individual deployments. In other words, compared to other patches we have, which are much more pointed - the impact here could be nontrivial.

Thanks @mridulm , i think about this question. We have 3 modifications:
1、change PartialReducerPartitionSpec in ShufflePartitionsUtil. it seems necessary and acceptable
2、modify DAGScheduler to forbid retry the skew read stage. it maybe can let client dont throw FetchFailed
3、determine whether the current shuffle has fallen back to the External Shuffle Service. it could be rercord spark external shuffle id and check

But, if we want to fully support the stage rerun feature, modifying the DAGScheduler is necessary in following pr

@RexXiong
Copy link
Contributor

RexXiong commented Jan 2, 2025

@wangshengjie123 I have already tested this PR, and everything as expected. The job aborts when a stage is rerun, and I think the capability to support rerun stages can be included in the next PR, @mridulm Could you help review this pr?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.