Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.RssShuffleUtils;
import org.apache.uniffle.common.exception.RssException;

public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {

Expand All @@ -54,6 +55,7 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
private DeserializationStream deserializationStream = null;
private ByteBufInputStream byteBufInputStream = null;
private long unCompressionLength = 0;
private ByteBuffer uncompressedData;

public RssShuffleDataIterator(
Serializer serializer,
Expand Down Expand Up @@ -106,8 +108,17 @@ public boolean hasNext() {
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (compressedData != null) {
shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
// Directbytebuffers are not collected in time will cause executor easy
// be killed by cluster managers(such as YARN) for using too much offheap memory
if (uncompressedData != null && uncompressedData.isDirect()) {
try {
RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
} catch (Exception e) {
throw new RssException("Destroy DirectByteBuffer failed!", e);
}
}
long startDecompress = System.currentTimeMillis();
ByteBuffer uncompressedData = RssShuffleUtils.decompressData(
uncompressedData = RssShuffleUtils.decompressData(
compressedData, compressedBlock.getUncompressLength());
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.uniffle.common;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
Expand Down Expand Up @@ -56,4 +59,29 @@ public static ByteBuffer decompressData(ByteBuffer data, int uncompressLength, b
fastDecompressor.decompress(data, data.position(), uncompressData, 0, uncompressLength);
return uncompressData;
}

/**
* DirectByteBuffers are garbage collected by using a phantom reference and a
* reference queue. Every once a while, the JVM checks the reference queue and
* cleans the DirectByteBuffers. However, as this doesn't happen
* immediately after discarding all references to a DirectByteBuffer, it's
* easy to OutOfMemoryError yourself using DirectByteBuffers. This function
* explicitly calls the Cleaner method of a DirectByteBuffer.
*
* @param toBeDestroyed
* The DirectByteBuffer that will be "cleaned". Utilizes reflection.
*
*/
public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
throws IllegalArgumentException, IllegalAccessException,
InvocationTargetException, SecurityException, NoSuchMethodException {
Preconditions.checkArgument(toBeDestroyed.isDirect(),
"toBeDestroyed isn't direct!");
Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(toBeDestroyed);
Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.setAccessible(true);
cleanMethod.invoke(cleaner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public class RssException extends RuntimeException {
public RssException(String message) {
super(message);
}

public RssException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.nio.ByteBuffer;
import org.apache.commons.lang3.RandomUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RssShuffleUtilsTest {

Expand All @@ -46,4 +48,27 @@ public void testCompression(int size) {
assertArrayEquals(data, buffer2);
}

@Test
public void testDestroyDirectByteBuffer() throws Exception {
int size = 10;
byte b = 1;
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
for (int i = 0; i < size; i++) {
byteBuffer.put(b);
}
byteBuffer.flip();
RssShuffleUtils.destroyDirectByteBuffer(byteBuffer);
// The memory may not be released fast enough.
Thread.sleep(200);
boolean same = true;
byte[] read = new byte[size];
byteBuffer.get(read);
for (byte br : read) {
if (b != br) {
same = false;
break;
}
}
assertTrue(!same);
}
}