From d63f7d59b56710efff007b65903b1ce065188c2a Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Mon, 14 Nov 2022 10:22:15 +0800 Subject: [PATCH 1/3] [WIP] Fix potenial bug when the index reading offset equals to data length --- .../segment/FixedSizeSegmentSplitter.java | 7 +++--- .../segment/FixedSizeSegmentSplitterTest.java | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java index 79e59b62d5..94d82c9862 100644 --- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java +++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java @@ -71,16 +71,17 @@ private static List transIndexDataToSegments(byte[] indexDat fileOffset = offset; } - bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); - bufferOffset += length; totalLength += length; // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException - if (dataFileLen != -1 && totalLength >= dataFileLen) { + if (dataFileLen != -1 && totalLength > dataFileLen) { break; } + bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); + bufferOffset += length; + if (bufferOffset >= readBufferSize) { ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); dataFileSegments.add(sds); diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java index 9282b8012a..e65a087929 100644 --- a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java +++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java @@ -33,6 +33,28 @@ public class FixedSizeSegmentSplitterTest { + @Test + public void testAvoidEOFException() { + SegmentSplitter splitter = new FixedSizeSegmentSplitter(1000); + byte[] data = generateData( + Pair.of(32, 0), + Pair.of(16, 0), + Pair.of(10, 0) + ); + + // case1 + List shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, 49)); + assertEquals(1, shuffleDataSegments.size()); + assertEquals(0, shuffleDataSegments.get(0).getOffset()); + assertEquals(48, shuffleDataSegments.get(0).getLength()); + + // case2 + shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, 48)); + assertEquals(1, shuffleDataSegments.size()); + assertEquals(0, shuffleDataSegments.get(0).getOffset()); + assertEquals(48, shuffleDataSegments.get(0).getLength()); + } + @Test public void testSplit() { SegmentSplitter splitter = new FixedSizeSegmentSplitter(100); From 56a0026ded27f53b4d4541b61ee6dcf13a9e5f92 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Mon, 14 Nov 2022 15:40:09 +0800 Subject: [PATCH 2/3] fix --- .../segment/FixedSizeSegmentSplitter.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java index 94d82c9862..e42f10f17b 100644 --- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java +++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java @@ -22,13 +22,17 @@ import java.util.List; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.Constants; public class FixedSizeSegmentSplitter implements SegmentSplitter { + private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class); private int readBufferSize; @@ -58,12 +62,13 @@ private static List transIndexDataToSegments(byte[] indexDat while (byteBuffer.hasRemaining()) { try { - long offset = byteBuffer.getLong(); - int length = byteBuffer.getInt(); - int uncompressLength = byteBuffer.getInt(); - long crc = byteBuffer.getLong(); - long blockId = byteBuffer.getLong(); - long taskAttemptId = byteBuffer.getLong(); + final long offset = byteBuffer.getLong(); + final int length = byteBuffer.getInt(); + final int uncompressLength = byteBuffer.getInt(); + final long crc = byteBuffer.getLong(); + final long blockId = byteBuffer.getLong(); + final long taskAttemptId = byteBuffer.getLong(); + // The index file is written, read and parsed sequentially, so these parsed index segments // index a continuous shuffle data in the corresponding data file and the first segment's // offset field is the offset of these shuffle data in the data file. @@ -76,6 +81,10 @@ private static List transIndexDataToSegments(byte[] indexDat // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException if (dataFileLen != -1 && totalLength > dataFileLen) { + long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1; + LOGGER.warn("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than " + + "the real data file length: {}(bytes). Partition id: {}. This should not happen.", + totalLength, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask)); break; } From 43f2df69e0a73002b0fae6e666d78c6b5542ef98 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Tue, 15 Nov 2022 10:24:30 +0800 Subject: [PATCH 3/3] fix --- .../segment/FixedSizeSegmentSplitterTest.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java index e65a087929..5655288c2f 100644 --- a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java +++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java @@ -22,6 +22,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; @@ -33,8 +35,9 @@ public class FixedSizeSegmentSplitterTest { - @Test - public void testAvoidEOFException() { + @ParameterizedTest + @ValueSource(ints = {48, 49, 57}) + public void testAvoidEOFException(int dataLength) { SegmentSplitter splitter = new FixedSizeSegmentSplitter(1000); byte[] data = generateData( Pair.of(32, 0), @@ -42,14 +45,7 @@ public void testAvoidEOFException() { Pair.of(10, 0) ); - // case1 - List shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, 49)); - assertEquals(1, shuffleDataSegments.size()); - assertEquals(0, shuffleDataSegments.get(0).getOffset()); - assertEquals(48, shuffleDataSegments.get(0).getLength()); - - // case2 - shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, 48)); + List shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, dataLength)); assertEquals(1, shuffleDataSegments.size()); assertEquals(0, shuffleDataSegments.get(0).getOffset()); assertEquals(48, shuffleDataSegments.get(0).getLength());