Skip to content

Conversation

@leixm
Copy link
Contributor

@leixm leixm commented Aug 27, 2022

###What changes were proposed in this pull request?
For issue #136 , When we use AQE, we may call shuffleWriteClient.getShuffleResult multiple times. But if both partition 1 and partition 2 are on the server A, we call getShuffleResult(partition 1) to get data form server A, and then we call getShuffleResult(partition 2) to get data form server A, it's not necassray. We can get getShuffleResult(partition 1, partition 2) instead.

###Why are the changes needed?
Improve getShuffleResult

###Does this PR introduce any user-facing change?
No

###How was this patch tested?
Added UT

@leixm
Copy link
Contributor Author

leixm commented Aug 27, 2022

@jerqi can you help review this pr plz?

@codecov-commenter
Copy link

codecov-commenter commented Aug 27, 2022

Codecov Report

Attention: Patch coverage is 42.57426% with 58 lines in your changes missing coverage. Please review.

Project coverage is 58.99%. Comparing base (5eb2758) to head (08c858f).
Report is 922 commits behind head on master.

Files with missing lines Patch % Lines
...pache/uniffle/server/ShuffleServerGrpcService.java 0.00% 29 Missing ⚠️
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 0.00% 28 Missing ⚠️
.../java/org/apache/uniffle/common/util/RssUtils.java 95.45% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master     #190      +/-   ##
============================================
+ Coverage     58.40%   58.99%   +0.58%     
- Complexity     1273     1325      +52     
============================================
  Files           158      160       +2     
  Lines          8446     8699     +253     
  Branches        784      815      +31     
============================================
+ Hits           4933     5132     +199     
- Misses         3260     3303      +43     
- Partials        253      264      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jerqi
Copy link
Contributor

jerqi commented Aug 27, 2022

Is it a compatible feature? If this is a breaking change, we should dicuss about it in the mail list first. But it seems to become a compatible feature if you adjust your implement.

@leixm
Copy link
Contributor Author

leixm commented Aug 27, 2022

I think it's a compatible feature, could you give me more detail? @jerqi

@leixm
Copy link
Contributor Author

leixm commented Aug 27, 2022

Or we should keep the original implementation getShuffleResult(partitionId), and then add a new implementation getShuffleResult(startPartition, endPartition)?

@jerqi
Copy link
Contributor

jerqi commented Aug 27, 2022

A compatible feature means that old version server can use the new version server, the new version client can use the old version server. You change the protobuf field name, it will cause incompatibility in my thought.

@leixm
Copy link
Contributor Author

leixm commented Aug 27, 2022

Thank you for your reminder, i will make it compatible.

@leixm
Copy link
Contributor Author

leixm commented Aug 28, 2022

@jerqi can you help review this pr plz?

@leixm
Copy link
Contributor Author

leixm commented Aug 28, 2022

org.apache.uniffle.common.security.HadoopSecurityContextTest test failed cause by Address already in use, It seems to be caused by running multiple tests at the same time

@jerqi
Copy link
Contributor

jerqi commented Aug 28, 2022

@zuston Could you help me solve the problem of the flaky kerberos test?

@zuston
Copy link
Member

zuston commented Aug 28, 2022

@zuston Could you help me solve the problem of the flaky kerberos test?

OK. Let me solve this tomorrow.

@jerqi
Copy link
Contributor

jerqi commented Aug 28, 2022

I think we should test this with some SQL query in integration test.

@leixm
Copy link
Contributor Author

leixm commented Aug 28, 2022

You're right , I will add some test in integration test.

@jerqi
Copy link
Contributor

jerqi commented Aug 29, 2022

Flaky test was fixed by #191 .Thanks @zuston

@leixm
Copy link
Contributor Author

leixm commented Aug 30, 2022

I think we should test this with some SQL query in integration test.

Already added integration test, Can you help review? Thank you @jerqi

@jerqi
Copy link
Contributor

jerqi commented Aug 30, 2022

If the new version client access the old version server, what will happen? Could we fallback to use old way to access the old version server when there are exceptions?

@leixm
Copy link
Contributor Author

leixm commented Aug 30, 2022

If the new version client access the old version server, what will happen? Could we fallback to use old way to access the old version server when there are exceptions?

I think the new version of the client does not need to be compatible with the old version of the server.

@jerqi
Copy link
Contributor

jerqi commented Aug 30, 2022

If the new version client access the old version server, what will happen? Could we fallback to use old way to access the old version server when there are exceptions?

I think the new version of the client does not need to be compatible with the old version of the server.

OK, I make a mistake. Maybe we just guarantee that old client can access the new server.

int32 shuffleId = 2;
repeated int32 partitions = 3;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a new message for response?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind

return hostName.replaceAll("[\\.-]", "_");
}

public static Map<Integer, Roaring64NavigableMap> shuffleBitmapToPartitionBitmap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need startPartition and endPartition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the partition may not have a block, but we need to fill it to avoid NPE when used, See

for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
      result.putIfAbsent(partitionId, Roaring64NavigableMap.bitmapOf());
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the partition may not have a block, but we need to fill it to avoid NPE when used, See

for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
      result.putIfAbsent(partitionId, Roaring64NavigableMap.bitmapOf());
    }

A little weird.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a better name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't resolved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind

public static Map<Integer, Roaring64NavigableMap> shuffleBitmapToPartitionBitmap(
Roaring64NavigableMap shuffleBitmap, int startPartition, int endPartition) {
Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need <= instead of <?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think we need <= here.

Copy link
Contributor

@jerqi jerqi Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think we need <= here.

Sorry, my mistake. We need < here

@jerqi
Copy link
Contributor

jerqi commented Aug 30, 2022

Could you provide some data about benchmark or performance test?

@leixm
Copy link
Contributor Author

leixm commented Aug 30, 2022

Could you provide some data about benchmark or performance test?

Ok, I will do some performance tests to try to prove that the performance will improve in some extreme scenarios of AQE

@leixm
Copy link
Contributor Author

leixm commented Sep 7, 2022

Environment

Shuffle Server Num : 5
Shuffle Write: 48G
Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974

After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

@jerqi
Copy link
Contributor

jerqi commented Sep 7, 2022

Environment

Shuffle Server Num : 5 Shuffle Write: 48G Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974
After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

What's your test case? TPC-DS?

@leixm
Copy link
Contributor Author

leixm commented Sep 7, 2022

No, this is our production task.

@jerqi
Copy link
Contributor

jerqi commented Sep 7, 2022

Please resolve the comments left.

@leixm
Copy link
Contributor Author

leixm commented Sep 8, 2022

Can you help review please? @jerqi

return hostName.replaceAll("[\\.-]", "_");
}

public static Map<Integer, Roaring64NavigableMap> shuffleBitmapToPartitionBitmap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't resolved.

jerqi
jerqi previously approved these changes Sep 8, 2022
Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some comments that is not resolved.

@jerqi jerqi dismissed their stale review September 8, 2022 09:29

sorry, i miss the unresolved comments.

@leixm
Copy link
Contributor Author

leixm commented Sep 8, 2022

Can you help review please? @jerqi

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @leixm @zuston

@jerqi jerqi merged commit 6aa4379 into apache:master Sep 8, 2022
@leixm
Copy link
Contributor Author

leixm commented Sep 8, 2022

@jerqi @zuston Thank you very much, good brother.

@jerqi
Copy link
Contributor

jerqi commented Oct 26, 2022

Environment

Shuffle Server Num : 5 Shuffle Write: 48G Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974
After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

Could you raise a new issue that Coordinator support to assign consecutive partitions to a server?

@leixm
Copy link
Contributor Author

leixm commented Oct 26, 2022

Environment

Shuffle Server Num : 5 Shuffle Write: 48G Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB
We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136
serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974
After issue_136
serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

Could you raise a new issue that Coordinator support to assign consecutive partitions to a server?

No problem, I'll follow up on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants