From 35c18cee4d1959766f80762a6da79d57d65c189e Mon Sep 17 00:00:00 2001 From: xianjingfeng <583872483@qq.com> Date: Tue, 26 Jul 2022 16:08:06 +0800 Subject: [PATCH 1/5] [Improvement] ShuffleBlock should be release when finished reading --- .../spark/shuffle/reader/RssShuffleDataIterator.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index 775285ca2f..b8645b0da3 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -38,6 +38,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient; import org.apache.uniffle.client.response.CompressedShuffleBlock; import org.apache.uniffle.common.RssShuffleUtils; +import sun.nio.ch.DirectBuffer; public class RssShuffleDataIterator extends AbstractIterator> { @@ -54,6 +55,7 @@ public class RssShuffleDataIterator extends AbstractIterator Date: Thu, 28 Jul 2022 23:26:49 +0800 Subject: [PATCH 2/5] Use reflection to destory DirectByteBuffer --- .../reader/RssShuffleDataIterator.java | 10 +++++-- .../uniffle/common/RssShuffleUtils.java | 28 +++++++++++++++++++ .../uniffle/common/RssShuffleUtilsTest.java | 25 +++++++++++++++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index b8645b0da3..f7b67b32d3 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -18,6 +18,7 @@ package org.apache.spark.shuffle.reader; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import com.esotericsoftware.kryo.io.Input; @@ -38,7 +39,6 @@ import org.apache.uniffle.client.api.ShuffleReadClient; import org.apache.uniffle.client.response.CompressedShuffleBlock; import org.apache.uniffle.common.RssShuffleUtils; -import sun.nio.ch.DirectBuffer; public class RssShuffleDataIterator extends AbstractIterator> { @@ -108,8 +108,14 @@ public boolean hasNext() { shuffleReadMetrics.incFetchWaitTime(fetchDuration); if (compressedData != null) { shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position()); + // Directbytebuffers are not collected in time will cause executor easy + // be killed by cluster managers(such as YARN) for using too much offheap memory if (uncompressedData != null && uncompressedData.isDirect()) { - ((DirectBuffer)uncompressedData).cleaner().clean(); + try { + RssShuffleUtils.destroyDirectByteBuffer(uncompressedData); + } catch (Exception e) { + throw new RuntimeException("Destroy DirectByteBuffer failed!", e); + } } long startDecompress = System.currentTimeMillis(); uncompressedData = RssShuffleUtils.decompressData( diff --git a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java index 15b2c7c42a..58db058ee9 100644 --- a/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/RssShuffleUtils.java @@ -17,8 +17,11 @@ package org.apache.uniffle.common; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import com.google.common.base.Preconditions; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4FastDecompressor; @@ -56,4 +59,29 @@ public static ByteBuffer decompressData(ByteBuffer data, int uncompressLength, b fastDecompressor.decompress(data, data.position(), uncompressData, 0, uncompressLength); return uncompressData; } + + /** + * DirectByteBuffers are garbage collected by using a phantom reference and a + * reference queue. Every once a while, the JVM checks the reference queue and + * cleans the DirectByteBuffers. However, as this doesn't happen + * immediately after discarding all references to a DirectByteBuffer, it's + * easy to OutOfMemoryError yourself using DirectByteBuffers. This function + * explicitly calls the Cleaner method of a DirectByteBuffer. + * + * @param toBeDestroyed + * The DirectByteBuffer that will be "cleaned". Utilizes reflection. + * + */ + public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed) + throws IllegalArgumentException, IllegalAccessException, + InvocationTargetException, SecurityException, NoSuchMethodException { + Preconditions.checkArgument(toBeDestroyed.isDirect(), + "toBeDestroyed isn't direct!"); + Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner"); + cleanerMethod.setAccessible(true); + Object cleaner = cleanerMethod.invoke(toBeDestroyed); + Method cleanMethod = cleaner.getClass().getMethod("clean"); + cleanMethod.setAccessible(true); + cleanMethod.invoke(cleaner); + } } diff --git a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java index c3488169c2..e1737a64c3 100644 --- a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java @@ -20,10 +20,12 @@ import java.nio.ByteBuffer; import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RssShuffleUtilsTest { @@ -46,4 +48,27 @@ public void testCompression(int size) { assertArrayEquals(data, buffer2); } + @Test + public void testDestroyDirectByteBuffer() throws Exception{ + int size = 10; + byte b = 1; + System.out.println(b); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size); + for (int i = 0; i < size; i++) { + byteBuffer.put(b); + } + byteBuffer.flip(); + RssShuffleUtils.destroyDirectByteBuffer(byteBuffer); + Thread.sleep(200); //Memory release maybe not fast enough + boolean same = true; + byte[] read = new byte[size]; + byteBuffer.get(read); + for (byte br : read) { + if (b != br) { + same = false; + break; + } + } + assertTrue(!same); + } } From 893a77a99c7ccc380dddf438889558cafc28ecf6 Mon Sep 17 00:00:00 2001 From: xianjingfeng <583872483@qq.com> Date: Thu, 28 Jul 2022 23:31:23 +0800 Subject: [PATCH 3/5] remove unnecessary import --- .../org/apache/spark/shuffle/reader/RssShuffleDataIterator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index f7b67b32d3..ff6635fcfe 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -18,7 +18,6 @@ package org.apache.spark.shuffle.reader; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import com.esotericsoftware.kryo.io.Input; From fdbd88ccb0fbf3f520abfebe1ac8e2ca92d986c1 Mon Sep 17 00:00:00 2001 From: xianjingfeng <583872483@qq.com> Date: Fri, 29 Jul 2022 10:23:52 +0800 Subject: [PATCH 4/5] remove ut and replace RuntimeException to RssException --- .../reader/RssShuffleDataIterator.java | 3 ++- .../common/exception/RssException.java | 4 +++ .../uniffle/common/RssShuffleUtilsTest.java | 25 ------------------- 3 files changed, 6 insertions(+), 26 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index ff6635fcfe..647fd1f611 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -28,6 +28,7 @@ import org.apache.spark.serializer.DeserializationStream; import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; +import org.apache.uniffle.common.exception.RssException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Product2; @@ -113,7 +114,7 @@ public boolean hasNext() { try { RssShuffleUtils.destroyDirectByteBuffer(uncompressedData); } catch (Exception e) { - throw new RuntimeException("Destroy DirectByteBuffer failed!", e); + throw new RssException("Destroy DirectByteBuffer failed!", e); } } long startDecompress = System.currentTimeMillis(); diff --git a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java index e5c4e40bc8..93a73690fc 100644 --- a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java +++ b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java @@ -23,4 +23,8 @@ public class RssException extends RuntimeException { public RssException(String message) { super(message); } + + public RssException(String message, Throwable e) { + super(message, e); + } } diff --git a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java index e1737a64c3..c3488169c2 100644 --- a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java @@ -20,12 +20,10 @@ import java.nio.ByteBuffer; import org.apache.commons.lang3.RandomUtils; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; public class RssShuffleUtilsTest { @@ -48,27 +46,4 @@ public void testCompression(int size) { assertArrayEquals(data, buffer2); } - @Test - public void testDestroyDirectByteBuffer() throws Exception{ - int size = 10; - byte b = 1; - System.out.println(b); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size); - for (int i = 0; i < size; i++) { - byteBuffer.put(b); - } - byteBuffer.flip(); - RssShuffleUtils.destroyDirectByteBuffer(byteBuffer); - Thread.sleep(200); //Memory release maybe not fast enough - boolean same = true; - byte[] read = new byte[size]; - byteBuffer.get(read); - for (byte br : read) { - if (b != br) { - same = false; - break; - } - } - assertTrue(!same); - } } From 92c1b0a891096958152c31c357b24b43d29d8b17 Mon Sep 17 00:00:00 2001 From: xianjingfeng <583872483@qq.com> Date: Fri, 29 Jul 2022 10:57:11 +0800 Subject: [PATCH 5/5] add ut again --- .../reader/RssShuffleDataIterator.java | 2 +- .../uniffle/common/RssShuffleUtilsTest.java | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index 647fd1f611..bd8184c5b3 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -28,7 +28,6 @@ import org.apache.spark.serializer.DeserializationStream; import org.apache.spark.serializer.Serializer; import org.apache.spark.serializer.SerializerInstance; -import org.apache.uniffle.common.exception.RssException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Product2; @@ -39,6 +38,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient; import org.apache.uniffle.client.response.CompressedShuffleBlock; import org.apache.uniffle.common.RssShuffleUtils; +import org.apache.uniffle.common.exception.RssException; public class RssShuffleDataIterator extends AbstractIterator> { diff --git a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java index c3488169c2..781b2d55b9 100644 --- a/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/RssShuffleUtilsTest.java @@ -20,10 +20,12 @@ import java.nio.ByteBuffer; import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RssShuffleUtilsTest { @@ -46,4 +48,27 @@ public void testCompression(int size) { assertArrayEquals(data, buffer2); } + @Test + public void testDestroyDirectByteBuffer() throws Exception { + int size = 10; + byte b = 1; + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size); + for (int i = 0; i < size; i++) { + byteBuffer.put(b); + } + byteBuffer.flip(); + RssShuffleUtils.destroyDirectByteBuffer(byteBuffer); + // The memory may not be released fast enough. + Thread.sleep(200); + boolean same = true; + byte[] read = new byte[size]; + byteBuffer.get(read); + for (byte br : read) { + if (b != br) { + same = false; + break; + } + } + assertTrue(!same); + } }