Skip to content

Conversation

@jinxing64
Copy link

What changes were proposed in this pull request?

In current code, blockIds in OpenBlocks are stored in the iterator on shuffle service.
There are some redundant characters in blockId("shuffle_" + shuffleId + "_" + mapId + "_" + reduceId). This pr proposes to improve the footprint and alleviate the memory pressure on shuffle service.

@jinxing64
Copy link
Author

n my cluster, we are suffering from OOM of shuffle-service.
We found that a lot of executors are fetching blocks from a single shuffle-service. Analyzing the memory, we found that the blockIds(shuffle_shuffleId_mapId_reduceId) takes about 1.5GBytes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can that really save much memory? Seems trivial

@SparkQA
Copy link

SparkQA commented Jun 7, 2017

Test build #77795 has finished for PR 18231 at commit 96d07aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

mapIdAndReduceIds = new byte[blockIds.length][];
if (blockIds.length > 0) {
for (int i = 0; i< blockIds.length; i++) {
mapIdAndReduceIds[i] = (blockIdParts[2] + "_" + blockIdParts[3]).getBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of storing this as a byte array, how about storing them as ints or longs (depending on what's the actual data type of the id)?

e.g., instead of:

private byte[][] mapIdAndReduceIds;

Which results in blockIds.length + 1 arrays in total, you could have a single one where for each block id you have two entries, one for map id and one for reduce id, or something along those lines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this is a good idea. In current change, I make it to be int[blockIds.length][2]. I'm not sure if I understand your comment correctly. Please take another look :)

private byte[][] mapIdAndReduceIds;

ManagedBufferIterator(String appId, String execId, String[] blockIds) {
this.appId = appId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if you see a lot of these in your heap dump too? You could potentially intern appId and execId for some extra memory savings, if you see a lot of those.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin
There's one appId and execId per stream. I don't see a lot in my heap dump. Do you have any thoughts for interning this? :)

@SparkQA
Copy link

SparkQA commented Jun 8, 2017

Test build #77806 has finished for PR 18231 at commit dcf156a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@srowen
Thanks a lot looking into this :)
For example: blockId="shuffle_20_1000_2000", it is stored as an String, which costs more than 20 bytes. In this change, it will cost only 8 bytes.

@jinxing64
Copy link
Author

@vanzin
Thanks a lot for reviewing this. I refined according to your comments, Please take another look at this when you have time :)

@srowen
Copy link
Member

srowen commented Jun 8, 2017

That's 12 bytes. Are there millions of these?

@jinxing64
Copy link
Author

Actually it's more than 12 bytes.
Yes, there are millions of these. In my heap dump, it's 1.5 G

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon, I'm missing how this saves memory somewhere -- where is a string stored that's now a shorter string?

}
this.shuffleId = blockId0Parts[1];
mapIdAndReduceIds = new int[blockIds.length][2];
if (blockIds.length > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is superfluous

String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length < 4) {
throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]);
} else if (!blockId0Parts[0].equals("shuffle")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the 'else' here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some kinds of BlockId, I guess it's better to have a check here and we can parse the blockId correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Sean means that since you're throwing in the previous block, else is redundant.

}
}

private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why break this out -- it's not necessary for the change right? just for clarity?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the iterator is becoming a little bit complicated. So I break this out and give a constructor.

@jinxing64
Copy link
Author

@srowen Sorry, I didn't make it clear.

  1. In current code, all blockIds are stored in the iterator. They are released only when the iterator is traversed.
  2. Now I change the String to be two int

@srowen
Copy link
Member

srowen commented Jun 8, 2017

The current iterator doesn't have any state except for an int. What are you referring to?

@jinxing64
Copy link
Author

I mean the blockIds in OpenBlocks, they have reference in iterator.

@srowen
Copy link
Member

srowen commented Jun 8, 2017

I get it. But that doesn't make the reference in OpenBlocks go away. This only helps anything is msg/msgObj can be garbage collected earlier. Is that the case? right now this is allocating additional memory, not instead of the existing memory.

@jinxing64
Copy link
Author

The blockIds cannot be freed because they are referenced in the iterator. In current change they are not. We reference the mapIdAndReduceIds instead. Thus the blockIds in OpenBlocks can be garbage collected.

@srowen
Copy link
Member

srowen commented Jun 8, 2017

That's not the question though. The question is whether they could be freed even after this change. msg still references it. That's what you need to establish, if only by some empirical testing.

@jinxing64
Copy link
Author

there is no where referencing msg, right? I guess the msg will be garbage collected fluently.

@srowen
Copy link
Member

srowen commented Jun 8, 2017

I'm not clear that's true, no. Not, at least, in the lifetime of the iterator. That's what has to be true for this to help anything. Do you have evidence this is true? for example if you have tests that clearly show the memory is released earlier, that would be good evidence.

@jinxing64
Copy link
Author

Yes, I think it's great to do some tests and give a good evidence.

@jinxing64
Copy link
Author

@srowen
I did a test to verify this patch.
I wrap a number of blocks inside OpenBlocks and send it to ExternalShuffleBlockHandler.
With this change:
it cost about 133M in the memory; analyzing heap dump, there is only int[][], blockIds is released.
Without this change,:
it cost about 362M in the memory; analyzing heap dump, there is String[].

@SparkQA
Copy link

SparkQA commented Jun 8, 2017

Test build #77811 has finished for PR 18231 at commit 1e53262.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private String execId;
private String shuffleId;
// An array containing mapId and reduceId pairs.
private int[][] mapIdAndReduceIds;
Copy link
Contributor

@vanzin vanzin Jun 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I mean a single array. e.g.

int[] mapIdAndReduceIds;

mapIdAndReduceIds = new int[blockIds.length * 2];
mapIdAndReduceIds[0] = mapId1;
mapIdAndReduceIds[1] = reduceId1;
mapIdAndReduceIds[2] = mapId2;
mapIdAndReduceIds[3] = reduceId2;
etc etc etc

Reason being that if you really have millions of these, each "child" array in your two-dimensional array wastes 16 (or 20?) bytes (16 bytes of object overhead + 4 bytes for the array length). Looking in jvisualvm, an empty array actually consumes 24 bytes, so it seems the JVM is aligning things and wasting an extra 4 bytes per array...

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77831 has finished for PR 18231 at commit 8170c8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jun 9, 2017

@srowen I don't see any references to the original OpenBlocks message nor to the block id array in the updated code, not sure why do you think there's still a reference somewhere?

@srowen
Copy link
Member

srowen commented Jun 9, 2017

There isn't a reference here anymore; there could be elsewhere. It sounds like there's good reason to believe there is not another reference hanging around though.

@vanzin
Copy link
Contributor

vanzin commented Jun 9, 2017

There isn't a reference here anymore; there could be elsewhere.

Only if there was a bug in the RPC layer, since this is an RPC handler and the message should not be referenced by the RPC code after the method returns.

@SparkQA
Copy link

SparkQA commented Jun 10, 2017

Test build #77863 has finished for PR 18231 at commit 1e72eab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2017

Test build #77862 has finished for PR 18231 at commit 5dd0e77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(String appId, String execId, int shuffleId, int mapId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style. See constructor at top of file for the style when param lists are long.

* assumptions about how the hash and sort based shuffles store their data.
* format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId).
*/
public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method used anywhere else? I only see ExternalShuffleBlockHandler using this class, and it now uses the new method. If only unit tests use this, then remove this method and fix the unit tests.

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78017 has finished for PR 18231 at commit 3239653.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78022 has finished for PR 18231 at commit a2af617.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit otherwise LGTM. I'll leave it overnight in case others want to take a look.

}
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i< blockIds.length; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before <

@vanzin
Copy link
Contributor

vanzin commented Jun 14, 2017

(also PR title has a typo, should be "redundant")

@jinxing64 jinxing64 changed the title [SPARK-20994] Remove reduant characters in OpenBlocks to save memory for shuffle service. [SPARK-20994] Remove redundant characters in OpenBlocks to save memory for shuffle service. Jun 15, 2017
@SparkQA
Copy link

SparkQA commented Jun 15, 2017

Test build #78079 has finished for PR 18231 at commit 6677bc9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change seems reasonable, but the code style still need to be improved. Also cc @cloud-fan to make a pass.

this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length < 4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use require(blockId0Parts.length < 4, "Unexpected block id format: " + blockIds[0]) instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to throw the IllegalArgumentException.
Pardon, I'm not sure how to use require in java.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, didn't notice they are java code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we be more strict and use blockId0Parts.length != 4?

if (blockId0Parts.length < 4) {
throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]);
}
if (!blockId0Parts[0].equals("shuffle")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blockIdParts[1].toInt != shuffleId ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to do that in java, right?

}
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rewrite this to be imperative?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon, could you give an example?

throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should keep the original format.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two blank lines originally. I guess it's appropriate to remove one?

};

String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
String[] blockIds = { "shuffle_0_1_2", "shuffle_0_3_4" };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change ,we cannot shuffle blocks with multiple shuffleIds

@jinxing64
Copy link
Author

@jiangxb1987
Thanks a lot for taking time review this pr. More comments are welcome.

this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check blockIdParts[0] == "shufle"?


@Override
public boolean hasNext() {
return index < mapIdAndReduceIds.length / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can keep a pos, and increase it by 2 in next, so here we can just write pos < mapIdAndReduceIds.length to save a division.

@cloud-fan
Copy link
Contributor

LGTM except some minor comments

@jinxing64
Copy link
Author

@cloud-fan
Thanks a lot for taking time review this. I refined accordingly :)

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78155 has started for PR 18231 at commit 2592ef4.

this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length < 4 || !blockId0Parts[0].equals("shuffle")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use blockId0Parts.length != 4?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78157 has started for PR 18231 at commit 5b0ce67.

@jinxing64
Copy link
Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78166 has finished for PR 18231 at commit 5b0ce67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 93dd0c5 Jun 16, 2017
@jinxing64
Copy link
Author

@cloud–fan
Thanks for merging !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants