Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore #222

Merged
merged 65 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
26f364f
substitution: DirectByteArrayOutputstream to DirectByteBufferOutputSt…
hy00nc Apr 17, 2019
4ded359
TPC-H application codes
hy00nc Apr 25, 2019
8c557d9
TPC-H application codes
hy00nc Apr 25, 2019
3ec9c8e
TPC-H application codes
hy00nc Apr 25, 2019
1dc1dd8
beam api changed
hy00nc Apr 25, 2019
666cd01
beam tests
hy00nc Apr 25, 2019
05e7033
logging
hy00nc Apr 26, 2019
ceba288
page size experiment 32KB
hy00nc May 4, 2019
0ab5c93
initial implementation
hy00nc May 8, 2019
940bef7
getBufferList fix
hy00nc May 8, 2019
9789d7e
revert and disable clearing list for now
hy00nc May 8, 2019
34848af
tests
hy00nc May 8, 2019
d01c0f0
For writeToFile experiment...
hy00nc May 8, 2019
87409c7
no stream
hy00nc May 9, 2019
6a296a6
logging
hy00nc May 9, 2019
4d349df
use SerializedMemoryStore
hy00nc May 9, 2019
5766022
logging
hy00nc May 9, 2019
e6b271e
logging
hy00nc May 9, 2019
767c182
new edge annotation
hy00nc May 9, 2019
e40064b
Return duplicated buffer when calling getBufferList()
May 10, 2019
c75ec2c
Small fix
May 10, 2019
a1bc8d6
tests for getData()
hy00nc May 10, 2019
64b9df2
Fix toByteArray()
May 10, 2019
47c9832
Merge branch 'offHeapTest' of github.com:hy00nc/incubator-nemo into o…
May 10, 2019
8f77496
revert customized DefaultDataStorePass
hy00nc May 10, 2019
19bad77
Merge branch 'offHeapTest' of https://github.com/hy00nc/incubator-nem…
hy00nc May 10, 2019
e1be1c4
Implement zero-copy file block write
May 10, 2019
597c2f1
Merge branch 'offHeapTest' of github.com:hy00nc/incubator-nemo into o…
May 10, 2019
5339c3a
Fix type conversion error
May 10, 2019
b1b7eb0
default + serializedmemorystore annotatation
hy00nc May 10, 2019
984652b
logging
hy00nc May 11, 2019
fbb577e
ConcurrentLinkedQueue for reuse
hy00nc May 11, 2019
cb8b9e6
allocation size decrease
hy00nc May 11, 2019
930cea0
allocation size increase
hy00nc May 11, 2019
dcd571c
allocation size increase
hy00nc May 11, 2019
4214da3
logging finalize() occurrence
hy00nc May 11, 2019
c4420b6
erase logging
hy00nc May 12, 2019
e0e7c8b
erase logging
hy00nc May 12, 2019
fc69549
on-heap in decode()
hy00nc May 12, 2019
673c98d
preallocation size decrease
hy00nc May 13, 2019
e9fc080
no more toByteArray() and exception when getData() called
hy00nc May 13, 2019
c5efdcf
use DirectByteBufferInputStream in serializedPartition -> nonSerializ…
hy00nc May 13, 2019
a2713f4
initial DirectByteBufferInputStream
hy00nc May 13, 2019
7253e6f
new Constructor for SerializedPartition
hy00nc May 13, 2019
93ecf76
offheap for converting non-serialized data to serialized data
hy00nc May 13, 2019
edd1975
initial edits
hy00nc Jun 18, 2019
53faa33
erase useless codes
hy00nc Jun 18, 2019
58925d3
ByteOutputContext
hy00nc Jun 18, 2019
d56494e
comments and erases
hy00nc Jun 18, 2019
39412ad
test
hy00nc Jun 18, 2019
fa49ca3
file channel use refactoring
hy00nc Jun 18, 2019
fbf97e2
file channel fix
hy00nc Jun 18, 2019
48c6b98
checkstyle
hy00nc Jun 18, 2019
6f4dae5
getData() exception
hy00nc Jun 18, 2019
e5ef592
checkstyle
hy00nc Jun 18, 2019
d0e5478
checkstyle
hy00nc Jun 18, 2019
0cb1f1e
Merge branch 'master' into Off-Heap_PR
hy00nc Jun 18, 2019
17cd1d7
getBuffer() fixed to return the duplicated list of ByteBuffers.
hy00nc Jun 19, 2019
7b4a6bc
fixed to pass BlockStoreTest
hy00nc Jun 19, 2019
617dfd3
Merge branch 'Off-Heap_PR' of https://github.com/hy00nc/incubator-nem…
hy00nc Jun 19, 2019
1b512c4
refactoring
hy00nc Jun 21, 2019
449649d
implements autocloseable
hy00nc Jun 22, 2019
38578df
refactoring
hy00nc Jun 24, 2019
c55d54d
rename class ByteBufferInputStream
hy00nc Jun 25, 2019
bf689d2
minor comment editing
hy00nc Jun 25, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;

/**
* This class is a customized input stream implementation which reads data from
* list of {@link ByteBuffer}.
*/
public class DirectByteBufferInputStream extends InputStream {
private List<ByteBuffer> bufList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment something like:
// The contents of direct buffers may reside outside of the normal garbage-collected heap

I found the above in the ByteBuffer documentation. Is there a way to "guarantee" that the contents reside outside of the heap?

My understanding is that it can be done by using DirectByteBuffer, rather than the abstract class ByteBuffer. It'd be good to make this change, also since the name of this class is DirectByteBufferInputStream.
https://www.javacodegeeks.com/2013/08/which-memory-is-faster-heap-or-bytebuffer-or-direct.html

Copy link
Member Author

@hy00nc hy00nc Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, the name of the class implies that this is only for DirectByteBuffer. I guess it is better to change the name of the class to ByteBufferInputStream because DirectByteBuffer class is not public and it can only be constructed by calling allocateDirect() method in ByteBuffer. It is possible to check whether the ByteBuffer is direct or not, but I am afraid it has no much meaning in checking it when it is already written(in on-heap or off-heap) and we are intending to read it. Does it seem okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
I am also fine with keeping the current class name DirectByteBufferInputStream, as we also have DirectByteBufferOutputStream.

private int current = 0;

/**
* Default Constructor.
*
* @param bufList is the target data to read.
*/
public DirectByteBufferInputStream(final List<ByteBuffer> bufList) {
this.bufList = bufList;
}

/**
* Reads data from the list of {@code ByteBuffer}s.
*
* @return integer.
* @throws IOException
*/
@Override
public int read() throws IOException {
return getBuffer().get() & 0xff;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make 0xff a static variable with a descriptive name?
I am curious why 0xff is needed here. Is there perhaps a link to a document that I can refer to? It'd be great to mention that in the comment as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably refer to this website: https://rules.sonarsource.com/java/RSPEC-3034 . As far as I know, this is a java-specific issue. Since the raw byte value in java represents signed value, it should be masked to get the proper value. I will mention this in the comment thanks :)

}

/**
* Return next non-empty @code{ByteBuffer}.
*
* @return @code{ByteBuffer} to write the data
* @throws IOException when fail to retrieve buffer.
*/
public ByteBuffer getBuffer() throws IOException {
while (current < bufList.size()) {
ByteBuffer buffer = bufList.get(current);
if (buffer.hasRemaining()) {
return buffer;
}
current += 1;
}
throw new EOFException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

Expand All @@ -31,11 +32,10 @@
public final class DirectByteBufferOutputStream extends OutputStream {
hy00nc marked this conversation as resolved.
Show resolved Hide resolved

private LinkedList<ByteBuffer> dataList = new LinkedList<>();
private static final int DEFAULT_PAGE_SIZE = 4096;
private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
private final int pageSize;
private ByteBuffer currentBuf;


/**
* Default constructor.
* Sets the {@code pageSize} as default size of 4096 bytes.
Expand Down Expand Up @@ -140,38 +140,45 @@ public byte[] toByteArray() {
final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
final byte[] byteArray = new byte[arraySize];
int start = 0;
int byteToWrite;

for (final ByteBuffer temp : dataList) {
for (final ByteBuffer buffer : dataList) {
// ByteBuffer has to be shifted to read mode by calling ByteBuffer.flip(),
// which sets limit to the current position and sets the position to 0.
// Note that capacity remains unchanged.
temp.flip();
byteToWrite = temp.remaining();
temp.get(byteArray, start, byteToWrite);
final ByteBuffer dupBuffer = buffer.duplicate();
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
dupBuffer.flip();
final int byteToWrite = dupBuffer.remaining();
dupBuffer.get(byteArray, start, byteToWrite);
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
start += byteToWrite;
}
// The limit of the last buffer has to be set to the capacity for additional write.
lastBuf.limit(lastBuf.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed it to read from duplicated buffer so the original one maintains its limit.


return byteArray;
}

/**
* Returns the list of {@code ByteBuffer}s that contains the written data.
* Note that by calling this method, the existing list of {@code ByteBuffer}s is cleared.
*
* @return the {@code LinkedList} of {@code ByteBuffer}s.
*/
public List<ByteBuffer> getBufferListAndClear() {
List<ByteBuffer> result = dataList;
dataList = new LinkedList<>();
for (final ByteBuffer buffer : result) {
buffer.flip();
public List<ByteBuffer> getBufferList() {
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
List<ByteBuffer> result = new ArrayList<>(dataList.size());
for (final ByteBuffer buffer : dataList) {
final ByteBuffer dupBuffer = buffer.duplicate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on duplicate().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please refer to the comment above 👍

dupBuffer.flip();
result.add(dupBuffer);
}
return result;
}

/**
* Returns the size of the data written in this output stream.
*
* @return the size of the data
*/
public int size() {
return pageSize * (dataList.size() - 1) + dataList.getLast().position();
}

/**
* Closing this output stream has no effect.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testGetBufferList() {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
byte[] totalOutput = outputStream.toByteArray();
List<ByteBuffer> bufList = outputStream.getBufferListAndClear();
List<ByteBuffer> bufList = outputStream.getBufferList();
int offset = 0;
int byteToRead;
for (final ByteBuffer temp : bufList) {
Expand Down
1 change: 0 additions & 1 deletion examples/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ under the License.
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
<dependency>
<groupId>com.github.fommil.netlib</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import static io.netty.buffer.Unpooled.wrappedBuffer;

/**
* Container for multiple output streams. Represents a transfer context on sender-side.
Expand Down Expand Up @@ -157,12 +161,37 @@ public void write(final byte[] bytes, final int offset, final int length) throws
* @return {@code this}
* @throws IOException when an exception has been set or this stream was closed
*/
public ByteOutputStream writeSerializedPartition(final SerializedPartition serializedPartition)
public ByteOutputStream writeSerializedPartitionBuffer(final SerializedPartition serializedPartition)
throws IOException {
write(serializedPartition.getData(), 0, serializedPartition.getLength());
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
writeBuffer(serializedPartition.getBuffer());
return this;
}

/**
* Wraps each of the {@link ByteBuffer} in the bufList to {@link ByteBuf} object
* to write a data frame.
*
* @param bufList
* @throws IOException
*/
public void writeBuffer(final List<ByteBuffer> bufList) throws IOException {
final ByteBuf byteBuf = wrappedBuffer(bufList.toArray(new ByteBuffer[bufList.size()]));
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
writeByteBuf(byteBuf);
}


/**
* Writes a data frame, from {@link ByteBuf}.
*
* @param byteBuf {@link ByteBuf} to write.
* @throws IOException
*/
private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
if (byteBuf.readableBytes() > 0) {
writeDataFrame(byteBuf, byteBuf.readableBytes());
}
}

/**
* Writes a data frame from {@link FileArea}.
*
Expand Down Expand Up @@ -196,17 +225,6 @@ public void close() throws IOException {
closed = true;
}

/**
* Writes a data frame, from {@link ByteBuf}.
*
* @param byteBuf {@link ByteBuf} to write.
*/
private void writeByteBuf(final ByteBuf byteBuf) throws IOException {
if (byteBuf.readableBytes() > 0) {
writeDataFrame(byteBuf, byteBuf.readableBytes());
}
}

/**
* Write an element to the channel.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void run() {
final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
for (final SerializedPartition partition : partitions) {
try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
os.writeSerializedPartition(partition);
os.writeSerializedPartitionBuffer(partition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.nemo.runtime.executor.data;

import com.google.common.io.CountingInputStream;
import org.apache.nemo.common.DirectByteArrayOutputStream;
import org.apache.nemo.common.DirectByteBufferInputStream;
import org.apache.nemo.common.DirectByteBufferOutputStream;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
Expand All @@ -31,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -112,19 +114,19 @@ public static <K extends Serializable> Iterable<SerializedPartition<K>> convertT
final List<SerializedPartition<K>> serializedPartitions = new ArrayList<>();
for (final NonSerializedPartition<K> partitionToConvert : partitionsToConvert) {
try (
DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
DirectByteBufferOutputStream bytesOutputStream = new DirectByteBufferOutputStream();
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
OutputStream wrappedStream = buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers())
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
) {
serializePartition(serializer.getEncoderFactory(), partitionToConvert, wrappedStream);
// We need to close wrappedStream on here, because DirectByteArrayOutputStream:getBufDirectly() returns
// inner buffer directly, which can be an unfinished(not flushed) buffer.
wrappedStream.close();
// Note that serializedBytes include invalid bytes.
// So we have to use it with the actualLength by using size() whenever needed.
final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
final List<ByteBuffer> serializedBufList = bytesOutputStream.getBufferList();
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
final int actualLength = bytesOutputStream.size();
serializedPartitions.add(
new SerializedPartition<>(partitionToConvert.getKey(), serializedBytes, actualLength));
new SerializedPartition<>(partitionToConvert.getKey(), serializedBufList, actualLength));
}
}
return serializedPartitions;
Expand All @@ -148,10 +150,10 @@ public static <K extends Serializable> Iterable<NonSerializedPartition<K>> conve
final K key = partitionToConvert.getKey();


try (ByteArrayInputStream byteArrayInputStream =
new ByteArrayInputStream(partitionToConvert.getData())) {
try (DirectByteBufferInputStream byteBufferInputStream =
new DirectByteBufferInputStream(partitionToConvert.getBuffer())) {
final NonSerializedPartition<K> deserializePartition = deserializePartition(
partitionToConvert.getLength(), serializer, key, byteArrayInputStream);
partitionToConvert.getLength(), serializer, key, byteBufferInputStream);
nonSerializedPartitions.add(deserializePartition);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,23 @@
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;
import java.io.*;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* This class represents a block which is stored in (local or remote) file.
Expand Down Expand Up @@ -83,11 +96,13 @@ public FileBlock(final String blockId,
*/
private void writeToFile(final Iterable<SerializedPartition<K>> serializedPartitions)
throws IOException {
try (FileOutputStream fileOutputStream = new FileOutputStream(filePath, true)) {
try (FileChannel fileOutputChannel = new FileOutputStream(filePath, true).getChannel()) {
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
for (final SerializedPartition<K> serializedPartition : serializedPartitions) {
// Reserve a partition write and get the metadata.
metadata.writePartitionMetadata(serializedPartition.getKey(), serializedPartition.getLength());
fileOutputStream.write(serializedPartition.getData(), 0, serializedPartition.getLength());
for (final ByteBuffer buffer: serializedPartition.getBuffer()) {
fileOutputChannel.write(buffer);
hy00nc marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down
Loading