diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java index 79e59b62d5..e42f10f17b 100644 --- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java +++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java @@ -22,13 +22,17 @@ import java.util.List; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.Constants; public class FixedSizeSegmentSplitter implements SegmentSplitter { + private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class); private int readBufferSize; @@ -58,12 +62,13 @@ private static List transIndexDataToSegments(byte[] indexDat while (byteBuffer.hasRemaining()) { try { - long offset = byteBuffer.getLong(); - int length = byteBuffer.getInt(); - int uncompressLength = byteBuffer.getInt(); - long crc = byteBuffer.getLong(); - long blockId = byteBuffer.getLong(); - long taskAttemptId = byteBuffer.getLong(); + final long offset = byteBuffer.getLong(); + final int length = byteBuffer.getInt(); + final int uncompressLength = byteBuffer.getInt(); + final long crc = byteBuffer.getLong(); + final long blockId = byteBuffer.getLong(); + final long taskAttemptId = byteBuffer.getLong(); + // The index file is written, read and parsed sequentially, so these parsed index segments // index a continuous shuffle data in the corresponding data file and the first segment's // offset field is the offset of these shuffle data in the data file. @@ -71,16 +76,21 @@ private static List transIndexDataToSegments(byte[] indexDat fileOffset = offset; } - bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); - bufferOffset += length; totalLength += length; // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException - if (dataFileLen != -1 && totalLength >= dataFileLen) { + if (dataFileLen != -1 && totalLength > dataFileLen) { + long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1; + LOGGER.warn("Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than " + + "the real data file length: {}(bytes). Partition id: {}. This should not happen.", + totalLength, dataFileLen, Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask)); break; } + bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); + bufferOffset += length; + if (bufferOffset >= readBufferSize) { ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); dataFileSegments.add(sds); diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java index 9282b8012a..5655288c2f 100644 --- a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java +++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java @@ -22,6 +22,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; @@ -33,6 +35,22 @@ public class FixedSizeSegmentSplitterTest { + @ParameterizedTest + @ValueSource(ints = {48, 49, 57}) + public void testAvoidEOFException(int dataLength) { + SegmentSplitter splitter = new FixedSizeSegmentSplitter(1000); + byte[] data = generateData( + Pair.of(32, 0), + Pair.of(16, 0), + Pair.of(10, 0) + ); + + List shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, dataLength)); + assertEquals(1, shuffleDataSegments.size()); + assertEquals(0, shuffleDataSegments.get(0).getOffset()); + assertEquals(48, shuffleDataSegments.get(0).getLength()); + } + @Test public void testSplit() { SegmentSplitter splitter = new FixedSizeSegmentSplitter(100);