diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index 38ff12a2f8..e1b97896f0 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -150,6 +150,10 @@ public boolean hasNext() { return recordsIterator.hasNext(); } + private boolean isSameMemoryType(ByteBuffer left, ByteBuffer right) { + return left.isDirect() == right.isDirect(); + } + private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) { long rawDataLength = rawData.limit() - rawData.position(); totalRawBytesLength += rawDataLength; @@ -157,7 +161,20 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) { int uncompressedLen = rawBlock.getUncompressLength(); if (codec != null) { - if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) { + if (uncompressedData == null + || uncompressedData.capacity() < uncompressedLen + || !isSameMemoryType(uncompressedData, rawData)) { + + if (LOG.isDebugEnabled()) { + if (!isSameMemoryType(uncompressedData, rawData)) { + LOG.debug( + "This should not happen that the temporary uncompressed data's memory type(isDirect:{}) " + + "is not same with fetched data buffer(isDirect:{})", + uncompressedData.isDirect(), + rawData.isDirect()); + } + } + if (uncompressedData != null) { RssUtils.releaseByteBuffer(uncompressedData); }