Skip to content

Commit

Permalink
[NEMO-350] Implement Off-heap SerializedMemoryStore & [NEMO-384] Impl…
Browse files Browse the repository at this point in the history
…ement DirectByteBufferInputStream for Off-heap SerializedMemoryStore (apache#222)

JIRA: [NEMO-350: Implement Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#350)
[NEMO-384: Implement DirectByteBufferInputStream for Off-heap SerializedMemoryStore](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-#384)

**Major changes:**
- When a block is emitted by an executor, we write it directly to off-heap memory using `DirectByteBufferOutputStream` and `DirectByteBufferOutputStream`.

**Minor changes to note:**
- `getData()` and `getBuffer` should be distinguished when acquiring data in `SerializedPartition`

**Other comments:**
- This implementation does not ensure performance gain since the overhead of `allocateDirect` (malloc) surpasses the garbage collection overhead. For this reason, memory management is being implemented.
  • Loading branch information
hy00nc authored and alapha23 committed Aug 2, 2019
1 parent 40da9ad commit b249697
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 298 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;

/**
* This class is a customized input stream implementation which reads data from
* list of {@link ByteBuffer}. If the {@link ByteBuffer} is direct, it may reside outside
* the normal garbage-collected heap memory.
*/
public class ByteBufferInputStream extends InputStream {
private List<ByteBuffer> bufList;
private int current = 0;
private static final int BITMASK = 0xff;

/**
* Default Constructor.
*
* @param bufList is the target data to read.
*/
public ByteBufferInputStream(final List<ByteBuffer> bufList) {
this.bufList = bufList;
}

/**
* Reads data from the list of {@code ByteBuffer}s.
*
* @return integer.
* @throws IOException
*/
@Override
public int read() throws IOException {
// Since java's byte is signed type, we have to mask it to make byte
// become unsigned type to properly retrieve `int` from sequence of bytes.
return getBuffer().get() & BITMASK;
}

/**
* Return next non-empty @code{ByteBuffer}.
*
* @return @code{ByteBuffer} to write the data
* @throws IOException when fail to retrieve buffer.
*/
public ByteBuffer getBuffer() throws IOException {
while (current < bufList.size()) {
ByteBuffer buffer = bufList.get(current);
if (buffer.hasRemaining()) {
return buffer;
}
current += 1;
}
throw new EOFException();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@
*/
package org.apache.nemo.common;

import com.google.common.annotations.VisibleForTesting;

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
* This class is a customized output stream implementation backed by
* {@link ByteBuffer}, which utilizes off heap memory when writing the data.
* Memory is allocated when needed by the specified {@code pageSize}.
* Deletion of {@code dataList}, which is the memory this outputstream holds, occurs
* when the corresponding block is deleted.
* TODO #388: Off-heap memory management (reuse ByteBuffer) - implement reuse.
*/
public final class DirectByteBufferOutputStream extends OutputStream {

private LinkedList<ByteBuffer> dataList = new LinkedList<>();
private static final int DEFAULT_PAGE_SIZE = 4096;
private static final int DEFAULT_PAGE_SIZE = 32768; //32KB
private final int pageSize;
private ByteBuffer currentBuf;


/**
* Default constructor.
* Sets the {@code pageSize} as default size of 4096 bytes.
Expand All @@ -45,8 +50,9 @@ public DirectByteBufferOutputStream() {
}

/**
* Constructor specifying the {@code size}.
* Sets the {@code pageSize} as {@code size}.
* Constructor which sets {@code pageSize} as specified {@code size}.
* Note that the {@code pageSize} has trade-off between memory fragmentation and
* native memory (de)allocation overhead.
*
* @param size should be a power of 2 and greater than or equal to 4096.
*/
Expand All @@ -62,6 +68,7 @@ public DirectByteBufferOutputStream(final int size) {
/**
* Allocates new {@link ByteBuffer} with the capacity equal to {@code pageSize}.
*/
// TODO #388: Off-heap memory management (reuse ByteBuffer)
private void newLastBuffer() {
dataList.addLast(ByteBuffer.allocateDirect(pageSize));
}
Expand Down Expand Up @@ -120,14 +127,15 @@ public void write(final byte[] b, final int off, final int len) {
}
}


/**
* Creates a byte array that contains the whole content currently written in this output stream.
* Note that this method causes array copy which could degrade performance.
* TODO #384: For performance issue, implement an input stream so that we do not have to use this method.
*
* USED BY TESTS ONLY.
* @return the current contents of this output stream, as byte array.
*/
public byte[] toByteArray() {
@VisibleForTesting
byte[] toByteArray() {
if (dataList.isEmpty()) {
final byte[] byteArray = new byte[0];
return byteArray;
Expand All @@ -140,38 +148,48 @@ public byte[] toByteArray() {
final int arraySize = pageSize * (dataList.size() - 1) + lastBuf.position();
final byte[] byteArray = new byte[arraySize];
int start = 0;
int byteToWrite;

for (final ByteBuffer temp : dataList) {
// ByteBuffer has to be shifted to read mode by calling ByteBuffer.flip(),
// which sets limit to the current position and sets the position to 0.
// Note that capacity remains unchanged.
temp.flip();
byteToWrite = temp.remaining();
temp.get(byteArray, start, byteToWrite);

for (final ByteBuffer buffer : dataList) {
// We use duplicated buffer to read the data so that there is no complicated
// alteration of position and limit when switching between read and write mode.
final ByteBuffer dupBuffer = buffer.duplicate();
dupBuffer.flip();
final int byteToWrite = dupBuffer.remaining();
dupBuffer.get(byteArray, start, byteToWrite);
start += byteToWrite;
}
// The limit of the last buffer has to be set to the capacity for additional write.
lastBuf.limit(lastBuf.capacity());

return byteArray;
}

/**
* Returns the list of {@code ByteBuffer}s that contains the written data.
* Note that by calling this method, the existing list of {@code ByteBuffer}s is cleared.
* List of flipped and duplicated {@link ByteBuffer}s are returned which has independent
* position and limit, to reduce erroneous data read/write.
* This function has to be called when intended to read from the start of the list of
* {@link ByteBuffer}s, not for additional write.
*
* @return the {@code LinkedList} of {@code ByteBuffer}s.
*/
public List<ByteBuffer> getBufferListAndClear() {
List<ByteBuffer> result = dataList;
dataList = new LinkedList<>();
for (final ByteBuffer buffer : result) {
buffer.flip();
public List<ByteBuffer> getDirectByteBufferList() {
List<ByteBuffer> result = new ArrayList<>(dataList.size());
for (final ByteBuffer buffer : dataList) {
final ByteBuffer dupBuffer = buffer.duplicate();
dupBuffer.flip();
result.add(dupBuffer);
}
return result;
}

/**
* Returns the size of the data written in this output stream.
*
* @return the size of the data
*/
public int size() {
return pageSize * (dataList.size() - 1) + dataList.getLast().position();
}

/**
* Closing this output stream has no effect.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.nemo.common.coder;

import org.apache.nemo.common.DirectByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -82,7 +82,7 @@ private BytesDecoder(final InputStream inputStream) {
public byte[] decode() throws IOException {
// We cannot use inputStream.available() to know the length of bytes to read.
// The available method only returns the number of bytes can be read without blocking.
final DirectByteArrayOutputStream byteOutputStream = new DirectByteArrayOutputStream();
final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
int b = inputStream.read();
while (b != -1) {
byteOutputStream.write(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ public void testLongReRead() {
}

@Test
public void testGetBufferList() {
public void testGetDirectBufferList() {
String value = RandomStringUtils.randomAlphanumeric(10000);
outputStream.write(value.getBytes());
byte[] totalOutput = outputStream.toByteArray();
List<ByteBuffer> bufList = outputStream.getBufferListAndClear();
List<ByteBuffer> bufList = outputStream.getDirectByteBufferList();
int offset = 0;
int byteToRead;
for (final ByteBuffer temp : bufList) {
Expand Down
Loading

0 comments on commit b249697

Please sign in to comment.