Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore #222

Merged
merged 65 commits into from
Jun 25, 2019

Conversation

hy00nc
Copy link
Member

@hy00nc hy00nc commented Jun 18, 2019

JIRA: NEMO-350: Implement Off-heap SerializedMemoryStore
NEMO-384: Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore

Major changes:

  • When a block is emitted by an executor, we write it directly to off-heap memory using DirectByteBufferOutputStream and DirectByteBufferOutputStream.

Minor changes to note:

  • getData() and getBuffer should be distinguished when acquiring data in SerializedPartition

Other comments:

  • This implementation does not ensure performance gain since the overhead of allocateDirect (malloc) surpasses the garbage collection overhead. For this reason, memory management is being implemented.

Copy link
Contributor

@johnyangk johnyangk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hy00nc! I've left some comments. I also have two questions regarding the issue description.

First, I was not able to find in the code "If there is any chance of using getData() in SerializedPartition, we check the length of the SerializedData to ensure that this SerializedPartition was created from existing data". Can you point me to it?

Second, can you file a JIRA issue and add a TODO comment besides allocateDirect? That would make it clear that line of code causes overheads.

e.g.,
// TODO #999: ISSUE_TITLE

*/
@Override
public int read() throws IOException {
return getBuffer().get() & 0xff;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make 0xff a static variable with a descriptive name?
I am curious why 0xff is needed here. Is there perhaps a link to a document that I can refer to? It'd be great to mention that in the comment as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably refer to this website: https://rules.sonarsource.com/java/RSPEC-3034 . As far as I know, this is a java-specific issue. Since the raw byte value in java represents signed value, it should be masked to get the proper value. I will mention this in the comment thanks :)

* list of {@link ByteBuffer}.
*/
public class DirectByteBufferInputStream extends InputStream {
private List<ByteBuffer> bufList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment something like:
// The contents of direct buffers may reside outside of the normal garbage-collected heap

I found the above in the ByteBuffer documentation. Is there a way to "guarantee" that the contents reside outside of the heap?

My understanding is that it can be done by using DirectByteBuffer, rather than the abstract class ByteBuffer. It'd be good to make this change, also since the name of this class is DirectByteBufferInputStream.
https://www.javacodegeeks.com/2013/08/which-memory-is-faster-heap-or-bytebuffer-or-direct.html

Copy link
Member Author

@hy00nc hy00nc Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, the name of the class implies that this is only for DirectByteBuffer. I guess it is better to change the name of the class to ByteBufferInputStream because DirectByteBuffer class is not public and it can only be constructed by calling allocateDirect() method in ByteBuffer. It is possible to check whether the ByteBuffer is direct or not, but I am afraid it has no much meaning in checking it when it is already written(in on-heap or off-heap) and we are intending to read it. Does it seem okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
I am also fine with keeping the current class name DirectByteBufferInputStream, as we also have DirectByteBufferOutputStream.

@hy00nc
Copy link
Member Author

hy00nc commented Jun 19, 2019

Thanks @hy00nc! I've left some comments. I also have two questions regarding the issue description.

First, I was not able to find in the code "If there is any chance of using getData() in SerializedPartition, we check the length of the SerializedData to ensure that this SerializedPartition was created from existing data". Can you point me to it?

Second, can you file a JIRA issue and add a TODO comment besides allocateDirect? That would make it clear that line of code causes overheads.

e.g.,
// TODO #999: ISSUE_TITLE

Thanks for the review @johnyangk !

For the first question, I didn't update the comment after addressing some fixes for the bug. Sorry for the confusion. Please check the updated comment :)

For the second one, maybe I can add to the existing JIRA about the overhead. I will also add the TODO in the code, thanks!

Copy link
Contributor

@johnyangk johnyangk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hy00nc! The code looks good.
I've left minor questions. I'll merge as soon as you confirm.

* list of {@link ByteBuffer}.
*/
public class DirectByteBufferInputStream extends InputStream {
private List<ByteBuffer> bufList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
I am also fine with keeping the current class name DirectByteBufferInputStream, as we also have DirectByteBufferOutputStream.

@hy00nc
Copy link
Member Author

hy00nc commented Jun 25, 2019

Thanks @hy00nc! The code looks good.
I've left minor questions. I'll merge as soon as you confirm.

Thanks again! I made minor changes and it's ready to be merged :)

@johnyangk johnyangk merged commit ea448db into apache:master Jun 25, 2019
alapha23 pushed a commit to alapha23/incubator-nemo that referenced this pull request Aug 2, 2019
…ement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (apache#222)

JIRA: [NEMO-350: Implement Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#350)
[NEMO-384: Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#384)

**Major changes:**
- When a block is emitted by an executor, we write it directly to off-heap memory using `DirectByteBufferOutputStream` and `DirectByteBufferOutputStream`.

**Minor changes to note:**
- `getData()` and `getBuffer` should be distinguished when acquiring data in `SerializedPartition`

**Other comments:**
- This implementation does not ensure performance gain since the overhead of `allocateDirect` (malloc) surpasses the garbage collection overhead. For this reason, memory management is being implemented.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants