diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java index a3cb2700eb..751b1e0825 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java @@ -76,6 +76,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.Constants; @@ -220,10 +221,14 @@ public Thread newThread(Runnable r) { } LOG.info("Start to register shuffle"); long start = System.currentTimeMillis(); - serverToPartitionRanges.entrySet().forEach(entry -> { - client.registerShuffle( - entry.getKey(), appId, 0, entry.getValue(), remoteStorage); - }); + serverToPartitionRanges.entrySet().forEach(entry -> client.registerShuffle( + entry.getKey(), + appId, + 0, + entry.getValue(), + remoteStorage, + ShuffleDataDistributionType.NORMAL + )); LOG.info("Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms"); return shuffleAssignments; }, retryInterval, retryTimes); diff --git a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java index 305a9dcb8d..d75d7647b8 100644 --- a/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java +++ b/client-mr/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java @@ -37,6 +37,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.exception.RssException; @@ -290,7 +291,8 @@ public void registerShuffle( String appId, int shuffleId, List partitionRanges, - RemoteStorageInfo remoteStorage) { + RemoteStorageInfo remoteStorage, + ShuffleDataDistributionType distributionType) { } @Override diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java index b5404e5963..1b920aedab 100644 --- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java +++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java @@ -67,6 +67,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.compression.Codec; import org.apache.uniffle.common.compression.Lz4Codec; @@ -386,7 +387,8 @@ public void registerShuffle( String appId, int shuffleId, List partitionRanges, - RemoteStorageInfo storageType) { + RemoteStorageInfo storageType, + ShuffleDataDistributionType distributionType) { } diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 26022a54af..cf79c70c9c 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -59,6 +59,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.RetryUtils; @@ -279,7 +280,13 @@ protected void registerShuffleServers( .stream() .forEach(entry -> { shuffleWriteClient.registerShuffle( - entry.getKey(), appId, shuffleId, entry.getValue(), remoteStorage); + entry.getKey(), + appId, + shuffleId, + entry.getValue(), + remoteStorage, + ShuffleDataDistributionType.NORMAL + ); }); LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms"); } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index ea29a4cdb8..27368d2079 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -64,7 +64,9 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.RetryUtils; import org.apache.uniffle.common.util.RssUtils; @@ -92,6 +94,7 @@ public class RssShuffleManager implements ShuffleManager { private ScheduledExecutorService heartBeatScheduledExecutorService; private boolean heartbeatStarted = false; private boolean dynamicConfEnabled = false; + private final ShuffleDataDistributionType dataDistributionType; private final EventLoop eventLoop; private final EventLoop defaultEventLoop = new EventLoop("ShuffleDataQueue") { @@ -155,6 +158,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { final int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX); this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE); this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED); + this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE); long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX); int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM); this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE); @@ -205,6 +209,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { Map> taskToFailedBlockIds) { this.sparkConf = conf; this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE); + this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE); this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL); this.heartbeatTimeout = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2); this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA); @@ -460,7 +465,9 @@ public ShuffleReader getReaderImpl( RssUtils.generatePartitionToBitmap(blockIdBitmap, startPartition, endPartition), taskIdBitmap, readMetrics, - RssSparkConfig.toRssConf(sparkConf)); + RssSparkConfig.toRssConf(sparkConf), + dataDistributionType + ); } private Roaring64NavigableMap getExpectedTasksByExecutorId( @@ -621,7 +628,9 @@ protected void registerShuffleServers( appId, shuffleId, entry.getValue(), - remoteStorage); + remoteStorage, + dataDistributionType + ); }); LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms"); } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java index 2806ce8293..f194972cbb 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java @@ -49,6 +49,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.request.CreateShuffleReadClientRequest; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssConf; @@ -76,6 +77,7 @@ public class RssShuffleReader implements ShuffleReader { private int mapEndIndex; private ShuffleReadMetrics readMetrics; private RssConf rssConf; + private ShuffleDataDistributionType dataDistributionType; public RssShuffleReader( int startPartition, @@ -93,7 +95,8 @@ public RssShuffleReader( Map partitionToExpectBlocks, Roaring64NavigableMap taskIdBitmap, ShuffleReadMetrics readMetrics, - RssConf rssConf) { + RssConf rssConf, + ShuffleDataDistributionType dataDistributionType) { this.appId = rssShuffleHandle.getAppId(); this.startPartition = startPartition; this.endPartition = endPartition; @@ -115,6 +118,7 @@ public RssShuffleReader( this.readMetrics = readMetrics; this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers(); this.rssConf = rssConf; + this.dataDistributionType = dataDistributionType; } @Override @@ -201,7 +205,8 @@ class MultiPartitionIterator extends AbstractIterator> { List shuffleServerInfoList = partitionToShuffleServers.get(partition); CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest( appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize, - 1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList, hadoopConf); + 1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList, + hadoopConf, dataDistributionType); ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request); RssShuffleDataIterator iterator = new RssShuffleDataIterator( shuffleDependency.serializer(), shuffleReadClient, diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java index 5f8eceeb4d..213061a47f 100644 --- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java +++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java @@ -32,6 +32,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap; import scala.Option; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler; import org.apache.uniffle.storage.util.StorageType; @@ -94,7 +95,10 @@ public void readTest() throws Exception { 1, partitionToExpectBlocks, taskIdBitmap, - new ShuffleReadMetrics(), new RssConf())); + new ShuffleReadMetrics(), + new RssConf(), + ShuffleDataDistributionType.NORMAL + )); validateResult(rssShuffleReaderSpy.read(), expectedData, 10); writeTestData(writeHandler1, 2, 4, expectedData, @@ -115,8 +119,10 @@ public void readTest() throws Exception { 2, partitionToExpectBlocks, taskIdBitmap, - new ShuffleReadMetrics(), new RssConf()) - ); + new ShuffleReadMetrics(), + new RssConf(), + ShuffleDataDistributionType.NORMAL + )); validateResult(rssShuffleReaderSpy1.read(), expectedData, 18); RssShuffleReader rssShuffleReaderSpy2 = spy(new RssShuffleReader( @@ -134,7 +140,10 @@ public void readTest() throws Exception { 2, partitionToExpectBlocks, Roaring64NavigableMap.bitmapOf(), - new ShuffleReadMetrics(), new RssConf())); + new ShuffleReadMetrics(), + new RssConf(), + ShuffleDataDistributionType.NORMAL + )); validateResult(rssShuffleReaderSpy2.read(), Maps.newHashMap(), 0); } diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java index 39df002922..4ab4b51c6c 100644 --- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java +++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java @@ -28,6 +28,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; public interface ShuffleWriteClient { @@ -41,7 +42,8 @@ void registerShuffle( String appId, int shuffleId, List partitionRanges, - RemoteStorageInfo remoteStorage); + RemoteStorageInfo remoteStorage, + ShuffleDataDistributionType dataDistributionType); boolean sendCommit(Set shuffleServerInfoSet, String appId, int shuffleId, int numMaps); diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java index 652f99232a..2b8b626497 100644 --- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java +++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java @@ -70,6 +70,6 @@ public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest request.getPartitionId(), request.getIndexReadLimit(), request.getPartitionNumPerRange(), request.getPartitionNum(), request.getReadBufferSize(), request.getBasePath(), request.getBlockIdBitmap(), request.getTaskIdBitmap(), request.getShuffleServerInfoList(), - request.getHadoopConf(), request.getIdHelper()); + request.getHadoopConf(), request.getIdHelper(), request.getShuffleDataDistributionType()); } } diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index 5059f8793c..28e81f2969 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -34,6 +34,7 @@ import org.apache.uniffle.client.response.CompressedShuffleBlock; import org.apache.uniffle.client.util.IdHelper; import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; @@ -60,6 +61,29 @@ public class ShuffleReadClientImpl implements ShuffleReadClient { private AtomicLong crcCheckTime = new AtomicLong(0); private ClientReadHandler clientReadHandler; private final IdHelper idHelper; + private ShuffleDataDistributionType dataDistributionType = ShuffleDataDistributionType.NORMAL; + + public ShuffleReadClientImpl( + String storageType, + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + String storageBasePath, + Roaring64NavigableMap blockIdBitmap, + Roaring64NavigableMap taskIdBitmap, + List shuffleServerInfoList, + Configuration hadoopConf, + IdHelper idHelper, + ShuffleDataDistributionType dataDistributionType) { + this(storageType, appId, shuffleId, partitionId, indexReadLimit, + partitionNumPerRange, partitionNum, readBufferSize, storageBasePath, + blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf, idHelper); + this.dataDistributionType = dataDistributionType; + } public ShuffleReadClientImpl( String storageType, @@ -96,6 +120,8 @@ public ShuffleReadClientImpl( request.setHadoopConf(hadoopConf); request.setExpectBlockIds(blockIdBitmap); request.setProcessBlockIds(processedBlockIds); + request.setDistributionType(dataDistributionType); + request.setExpectTaskIds(taskIdBitmap); List removeBlockIds = Lists.newArrayList(); blockIdBitmap.forEach(bid -> { diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index e8372d8ec6..be83ca0fc1 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -73,6 +73,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.ThreadUtils; @@ -332,7 +333,8 @@ public void registerShuffle( String appId, int shuffleId, List partitionRanges, - RemoteStorageInfo remoteStorage) { + RemoteStorageInfo remoteStorage, + ShuffleDataDistributionType dataDistributionType) { String user = null; try { user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -342,7 +344,7 @@ public void registerShuffle( LOG.info("User: {}", user); RssRegisterShuffleRequest request = - new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user); + new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user, dataDistributionType); RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request); String msg = "Error happened when registerShuffle with appId[" + appId + "], shuffleId[" + shuffleId diff --git a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java index d9f8a754ce..2cfd021d31 100644 --- a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java +++ b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java @@ -24,6 +24,7 @@ import org.apache.uniffle.client.util.DefaultIdHelper; import org.apache.uniffle.client.util.IdHelper; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; public class CreateShuffleReadClientRequest { @@ -42,6 +43,28 @@ public class CreateShuffleReadClientRequest { private List shuffleServerInfoList; private Configuration hadoopConf; private IdHelper idHelper; + private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL; + + public CreateShuffleReadClientRequest( + String appId, + int shuffleId, + int partitionId, + String storageType, + String basePath, + int indexReadLimit, + int readBufferSize, + int partitionNumPerRange, + int partitionNum, + Roaring64NavigableMap blockIdBitmap, + Roaring64NavigableMap taskIdBitmap, + List shuffleServerInfoList, + Configuration hadoopConf, + ShuffleDataDistributionType dataDistributionType) { + this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize, + partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList, + hadoopConf, new DefaultIdHelper()); + this.shuffleDataDistributionType = dataDistributionType; + } public CreateShuffleReadClientRequest( String appId, @@ -148,4 +171,8 @@ public Configuration getHadoopConf() { public IdHelper getIdHelper() { return idHelper; } + + public ShuffleDataDistributionType getShuffleDataDistributionType() { + return shuffleDataDistributionType; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleDataDistributionType.java b/common/src/main/java/org/apache/uniffle/common/ShuffleDataDistributionType.java new file mode 100644 index 0000000000..70b1dbaab2 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataDistributionType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common; + +/** + * The type of shuffle data distribution of a single partition. + */ +public enum ShuffleDataDistributionType { + NORMAL, + LOCAL_ORDER +} diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 99d82e038f..cb6ec2e8df 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -17,6 +17,7 @@ package org.apache.uniffle.common.config; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.compression.Codec; import static org.apache.uniffle.common.compression.Codec.Type.LZ4; @@ -35,4 +36,11 @@ public class RssClientConf { .intType() .defaultValue(3) .withDescription("The zstd compression level, the default level is 3"); + + public static final ConfigOption DATA_DISTRIBUTION_TYPE = ConfigOptions + .key("rss.client.shuffle.data.distribution.type") + .enumType(ShuffleDataDistributionType.class) + .defaultValue(ShuffleDataDistributionType.NORMAL) + .withDescription("The type of partition shuffle data distribution, including normal and local_order. " + + "The default value is normal. This config is only valid in Spark3.x"); } diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java new file mode 100644 index 0000000000..79e59b62d5 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.segment; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.exception.RssException; + +public class FixedSizeSegmentSplitter implements SegmentSplitter { + + private int readBufferSize; + + public FixedSizeSegmentSplitter(int readBufferSize) { + this.readBufferSize = readBufferSize; + } + + @Override + public List split(ShuffleIndexResult shuffleIndexResult) { + if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { + return Lists.newArrayList(); + } + + byte[] indexData = shuffleIndexResult.getIndexData(); + long dataFileLen = shuffleIndexResult.getDataFileLen(); + return transIndexDataToSegments(indexData, readBufferSize, dataFileLen); + } + + private static List transIndexDataToSegments(byte[] indexData, + int readBufferSize, long dataFileLen) { + ByteBuffer byteBuffer = ByteBuffer.wrap(indexData); + List bufferSegments = Lists.newArrayList(); + List dataFileSegments = Lists.newArrayList(); + int bufferOffset = 0; + long fileOffset = -1; + long totalLength = 0; + + while (byteBuffer.hasRemaining()) { + try { + long offset = byteBuffer.getLong(); + int length = byteBuffer.getInt(); + int uncompressLength = byteBuffer.getInt(); + long crc = byteBuffer.getLong(); + long blockId = byteBuffer.getLong(); + long taskAttemptId = byteBuffer.getLong(); + // The index file is written, read and parsed sequentially, so these parsed index segments + // index a continuous shuffle data in the corresponding data file and the first segment's + // offset field is the offset of these shuffle data in the data file. + if (fileOffset == -1) { + fileOffset = offset; + } + + bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); + bufferOffset += length; + totalLength += length; + + // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater + // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException + if (dataFileLen != -1 && totalLength >= dataFileLen) { + break; + } + + if (bufferOffset >= readBufferSize) { + ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); + dataFileSegments.add(sds); + bufferSegments = Lists.newArrayList(); + bufferOffset = 0; + fileOffset = -1; + } + } catch (BufferUnderflowException ue) { + throw new RssException("Read index data under flow", ue); + } + } + + if (bufferOffset > 0) { + ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); + dataFileSegments.add(sds); + } + + return dataFileSegments; + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java new file mode 100644 index 0000000000..77e02e06d6 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.segment; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.collect.Lists; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleIndexResult; +import org.apache.uniffle.common.exception.RssException; + +/** + * {@class LocalOrderSegmentSplitter} will be initialized only when the {@class ShuffleDataDistributionType} + * is LOCAL_ORDER, which means the index file will be split into several segments according to its + * locally ordered properties. And it will skip some blocks, but the remaining blocks in a segment + * are continuous. + * + * This strategy will be useful for Spark AQE skew optimization, it will split the single partition into + * multiple shuffle readers, and each one will fetch partial single partition data which is in the range of + * [StartMapId, endMapId). And so if one reader uses this, it will skip lots of unnecessary blocks. + * + * Last but not least, this split strategy depends on LOCAL_ORDER of index file, which must be guaranteed by + * the shuffle server. + */ +public class LocalOrderSegmentSplitter implements SegmentSplitter { + + private Roaring64NavigableMap expectTaskIds; + private int readBufferSize; + + public LocalOrderSegmentSplitter(Roaring64NavigableMap expectTaskIds, int readBufferSize) { + this.expectTaskIds = expectTaskIds; + this.readBufferSize = readBufferSize; + } + + @Override + public List split(ShuffleIndexResult shuffleIndexResult) { + if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { + return Lists.newArrayList(); + } + + byte[] indexData = shuffleIndexResult.getIndexData(); + long dataFileLen = shuffleIndexResult.getDataFileLen(); + + ByteBuffer byteBuffer = ByteBuffer.wrap(indexData); + List bufferSegments = Lists.newArrayList(); + + List dataFileSegments = Lists.newArrayList(); + int bufferOffset = 0; + long fileOffset = -1; + long totalLen = 0; + + long lastTaskAttemptId = -1; + + /** + * One ShuffleDataSegment should meet following requirements: + * + * 1. taskId in [startMapId, endMapId) taskIds bitmap + * 2. ShuffleDataSegment size should < readBufferSize + * 3. ShuffleDataSegment's blocks should be continuous + * + */ + while (byteBuffer.hasRemaining()) { + try { + long offset = byteBuffer.getLong(); + int length = byteBuffer.getInt(); + int uncompressLength = byteBuffer.getInt(); + long crc = byteBuffer.getLong(); + long blockId = byteBuffer.getLong(); + long taskAttemptId = byteBuffer.getLong(); + + if (lastTaskAttemptId == -1) { + lastTaskAttemptId = taskAttemptId; + } + + // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater + // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException + if (dataFileLen != -1 && totalLen >= dataFileLen) { + break; + } + + if ((taskAttemptId < lastTaskAttemptId && bufferSegments.size() > 0) || bufferOffset >= readBufferSize) { + ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); + dataFileSegments.add(sds); + bufferSegments = Lists.newArrayList(); + bufferOffset = 0; + fileOffset = -1; + } + + if (expectTaskIds.contains(taskAttemptId)) { + if (fileOffset == -1) { + fileOffset = offset; + } + bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); + bufferOffset += length; + } + + lastTaskAttemptId = taskAttemptId; + } catch (BufferUnderflowException ue) { + throw new RssException("Read index data under flow", ue); + } + } + + if (bufferOffset > 0) { + ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); + dataFileSegments.add(sds); + } + + return dataFileSegments; + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitter.java new file mode 100644 index 0000000000..6bf2432778 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.segment; + +import java.util.List; + +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleIndexResult; + +public interface SegmentSplitter { + + List split(ShuffleIndexResult shuffleIndexResult); + +} diff --git a/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitterFactory.java b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitterFactory.java new file mode 100644 index 0000000000..0fc0ff08d3 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/segment/SegmentSplitterFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.segment; + +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.ShuffleDataDistributionType; + +public class SegmentSplitterFactory { + + private SegmentSplitterFactory() { + // ignore + } + + private static class LazyHolder { + static final SegmentSplitterFactory INSTANCE = new SegmentSplitterFactory(); + } + + public SegmentSplitter get( + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds, + int readBufferSize) { + switch (distributionType) { + case LOCAL_ORDER: + return new LocalOrderSegmentSplitter(expectTaskIds, readBufferSize); + case NORMAL: + default: + return new FixedSizeSegmentSplitter(readBufferSize); + } + } + + public static SegmentSplitterFactory getInstance() { + return LazyHolder.INSTANCE; + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java index 848d6d01d6..16e21da36b 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java @@ -30,8 +30,6 @@ import java.net.InetAddress; import java.net.InterfaceAddress; import java.net.NetworkInterface; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.HashMap; @@ -49,11 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ShuffleDataSegment; -import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.exception.RssException; public class RssUtils { @@ -184,71 +178,6 @@ public static Roaring64NavigableMap cloneBitMap(Roaring64NavigableMap bitmap) { return clone; } - public static List transIndexDataToSegments( - ShuffleIndexResult shuffleIndexResult, int readBufferSize) { - if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { - return Lists.newArrayList(); - } - - byte[] indexData = shuffleIndexResult.getIndexData(); - long dataFileLen = shuffleIndexResult.getDataFileLen(); - return transIndexDataToSegments(indexData, readBufferSize, dataFileLen); - } - - private static List transIndexDataToSegments(byte[] indexData, - int readBufferSize, long dataFileLen) { - ByteBuffer byteBuffer = ByteBuffer.wrap(indexData); - List bufferSegments = Lists.newArrayList(); - List dataFileSegments = Lists.newArrayList(); - int bufferOffset = 0; - long fileOffset = -1; - long totalLength = 0; - - while (byteBuffer.hasRemaining()) { - try { - long offset = byteBuffer.getLong(); - int length = byteBuffer.getInt(); - int uncompressLength = byteBuffer.getInt(); - long crc = byteBuffer.getLong(); - long blockId = byteBuffer.getLong(); - long taskAttemptId = byteBuffer.getLong(); - // The index file is written, read and parsed sequentially, so these parsed index segments - // index a continuous shuffle data in the corresponding data file and the first segment's - // offset field is the offset of these shuffle data in the data file. - if (fileOffset == -1) { - fileOffset = offset; - } - - bufferSegments.add(new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); - bufferOffset += length; - totalLength += length; - - // If ShuffleServer is flushing the file at this time, the length in the index file record may be greater - // than the length in the actual data file, and it needs to be returned at this time to avoid EOFException - if (dataFileLen != -1 && totalLength >= dataFileLen) { - break; - } - - if (bufferOffset >= readBufferSize) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - bufferSegments = Lists.newArrayList(); - bufferOffset = 0; - fileOffset = -1; - } - } catch (BufferUnderflowException ue) { - throw new RssException("Read index data under flow", ue); - } - } - - if (bufferOffset > 0) { - ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); - dataFileSegments.add(sds); - } - - return dataFileSegments; - } - public static String generateShuffleKey(String appId, int shuffleId) { return String.join(Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId)); } diff --git a/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java new file mode 100644 index 0000000000..9282b8012a --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitterTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.segment; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleIndexResult; + +import static org.apache.uniffle.common.segment.LocalOrderSegmentSplitterTest.generateData; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class FixedSizeSegmentSplitterTest { + + @Test + public void testSplit() { + SegmentSplitter splitter = new FixedSizeSegmentSplitter(100); + ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult(); + List shuffleDataSegments = splitter.split(shuffleIndexResult); + assertTrue(shuffleDataSegments.isEmpty()); + + int readBufferSize = 32; + splitter = new FixedSizeSegmentSplitter(32); + + // those 5 segment's data length are [32, 16, 10, 32, 6] so the index should be + // split into 3 ShuffleDataSegment, which are [32, 16 + 10 + 32, 6] + byte[] data = generateData( + Pair.of(32, 0), + Pair.of(16, 0), + Pair.of(10, 0), + Pair.of(32, 6), + Pair.of(6, 0) + ); + shuffleDataSegments = splitter.split(new ShuffleIndexResult(data, -1)); + assertEquals(3, shuffleDataSegments.size()); + + assertEquals(0, shuffleDataSegments.get(0).getOffset()); + assertEquals(32, shuffleDataSegments.get(0).getLength()); + assertEquals(1, shuffleDataSegments.get(0).getBufferSegments().size()); + + assertEquals(32, shuffleDataSegments.get(1).getOffset()); + assertEquals(58, shuffleDataSegments.get(1).getLength()); + assertEquals(3,shuffleDataSegments.get(1).getBufferSegments().size()); + + assertEquals(90, shuffleDataSegments.get(2).getOffset()); + assertEquals(6, shuffleDataSegments.get(2).getLength()); + assertEquals(1, shuffleDataSegments.get(2).getBufferSegments().size()); + + ByteBuffer incompleteByteBuffer = ByteBuffer.allocate(12); + incompleteByteBuffer.putLong(1L); + incompleteByteBuffer.putInt(2); + data = incompleteByteBuffer.array(); + // It should throw exception + try { + splitter.split(new ShuffleIndexResult(data, -1)); + fail(); + } catch (Exception e) { + // ignore + assertTrue(e.getMessage().contains("Read index data under flow")); + } + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java new file mode 100644 index 0000000000..27a2981448 --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitterTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.segment; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleIndexResult; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LocalOrderSegmentSplitterTest { + + @Test + public void testSplit() { + Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(1); + LocalOrderSegmentSplitter splitter = new LocalOrderSegmentSplitter(taskIds, 1000); + assertTrue(splitter.split(new ShuffleIndexResult()).isEmpty()); + + splitter = new LocalOrderSegmentSplitter(taskIds, 32); + + /** + * (length, taskId) + * case1: (32, 1) (16, 1) (10, 2) (16, 1) (6, 1) + * + * (10, 2) will be dropped + */ + byte[] data = generateData( + Pair.of(32, 1), + Pair.of(16, 1), + Pair.of(10, 2), + Pair.of(16, 1), + Pair.of(6, 1) + ); + List dataSegments = splitter.split(new ShuffleIndexResult(data, -1)); + assertEquals(3, dataSegments.size()); + + assertEquals(0, dataSegments.get(0).getOffset()); + assertEquals(32, dataSegments.get(0).getLength()); + + assertEquals(32, dataSegments.get(1).getOffset()); + assertEquals(16, dataSegments.get(1).getLength()); + + assertEquals(58, dataSegments.get(2).getOffset()); + assertEquals(22, dataSegments.get(2).getLength()); + + /** + * case2: (32, 2) (16, 1) (10, 1) (16, 2) (6, 1) + * + * (32, 2) (16, 2) will be dropped + */ + data = generateData( + Pair.of(32, 2), + Pair.of(16, 1), + Pair.of(10, 1), + Pair.of(16, 2), + Pair.of(6, 1) + ); + dataSegments = splitter.split(new ShuffleIndexResult(data, -1)); + assertEquals(2, dataSegments.size()); + + assertEquals(32, dataSegments.get(0).getOffset()); + assertEquals(26, dataSegments.get(0).getLength()); + + assertEquals(74, dataSegments.get(1).getOffset()); + assertEquals(6, dataSegments.get(1).getLength()); + + /** + * case3: (32, 5) (16, 1) (10, 3) (16, 4) (6, 1) + * + * (32, 5) will be dropped + */ + taskIds = Roaring64NavigableMap.bitmapOf(1, 2, 3, 4); + splitter = new LocalOrderSegmentSplitter(taskIds, 32); + data = generateData( + Pair.of(32, 5), + Pair.of(16, 1), + Pair.of(10, 3), + Pair.of(16, 4), + Pair.of(6, 1) + ); + dataSegments = splitter.split(new ShuffleIndexResult(data, -1)); + assertEquals(2, dataSegments.size()); + + assertEquals(32, dataSegments.get(0).getOffset()); + assertEquals(42, dataSegments.get(0).getLength()); + + assertEquals(74, dataSegments.get(1).getOffset()); + assertEquals(6, dataSegments.get(1).getLength()); + } + + public static byte[] generateData(Pair... configEntries) { + ByteBuffer byteBuffer = ByteBuffer.allocate(configEntries.length * 40); + int total = 0; + for (Pair entry : configEntries) { + byteBuffer.putLong(total); + byteBuffer.putInt(entry.getLeft()); + byteBuffer.putInt(1); + byteBuffer.putLong(1); + byteBuffer.putLong(1); + byteBuffer.putLong(entry.getRight()); + + total += entry.getLeft(); + } + return byteBuffer.array(); + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java index 4c1f5a8c5f..a84e3902ba 100644 --- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java @@ -21,7 +21,6 @@ import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,9 +34,6 @@ import org.junit.jupiter.api.Test; import org.roaringbitmap.longlong.Roaring64NavigableMap; -import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ShuffleDataSegment; -import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShuffleServerInfo; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -103,69 +99,6 @@ public void testCloneBitmap() { assertEquals(bitmap1, bitmap2); } - @Test - public void testShuffleIndexSegment() { - ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult(); - List shuffleDataSegments = - RssUtils.transIndexDataToSegments(shuffleIndexResult, 1000); - assertTrue(shuffleDataSegments.isEmpty()); - - int readBufferSize = 32; - int totalLength = 0; - List bufferSegments = Lists.newArrayList(); - int[] dataSegmentLength = new int[]{32, 16, 10, 32, 6}; - - for (int i = 0; i < dataSegmentLength.length; ++i) { - long offset = totalLength; - int length = dataSegmentLength[i]; - bufferSegments.add(new BufferSegment(i, offset, length, i, i, i)); - totalLength += length; - } - - // those 5 segment's data length are [32, 16, 10, 32, 6] so the index should be - // split into 3 ShuffleDataSegment, which are [32, 16 + 10 + 32, 6] - int expectedTotalSegmentNum = 3; - ByteBuffer byteBuffer = ByteBuffer.allocate(5 * 40); - - for (BufferSegment bufferSegment : bufferSegments) { - byteBuffer.putLong(bufferSegment.getOffset()); - byteBuffer.putInt(bufferSegment.getLength()); - byteBuffer.putInt(bufferSegment.getUncompressLength()); - byteBuffer.putLong(bufferSegment.getCrc()); - byteBuffer.putLong(bufferSegment.getBlockId()); - byteBuffer.putLong(bufferSegment.getTaskAttemptId()); - } - - byte[] data = byteBuffer.array(); - shuffleDataSegments = RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize); - assertEquals(expectedTotalSegmentNum, shuffleDataSegments.size()); - - assertEquals(0, shuffleDataSegments.get(0).getOffset()); - assertEquals(32, shuffleDataSegments.get(0).getLength()); - assertEquals(1, shuffleDataSegments.get(0).getBufferSegments().size()); - - assertEquals(32, shuffleDataSegments.get(1).getOffset()); - assertEquals(58, shuffleDataSegments.get(1).getLength()); - assertEquals(3,shuffleDataSegments.get(1).getBufferSegments().size()); - - assertEquals(90, shuffleDataSegments.get(2).getOffset()); - assertEquals(6, shuffleDataSegments.get(2).getLength()); - assertEquals(1, shuffleDataSegments.get(2).getBufferSegments().size()); - - ByteBuffer incompleteByteBuffer = ByteBuffer.allocate(12); - incompleteByteBuffer.putLong(1L); - incompleteByteBuffer.putInt(2); - data = incompleteByteBuffer.array(); - // It should throw exception - try { - RssUtils.transIndexDataToSegments(new ShuffleIndexResult(data, -1), readBufferSize); - fail(); - } catch (Exception e) { - // ignore - assertTrue(e.getMessage().contains("Read index data under flow")); - } - } - @Test public void getMetricNameForHostNameTest() { assertEquals("a_b_c", RssUtils.getMetricNameForHostName("a.b.c")); diff --git a/docs/client_guide.md b/docs/client_guide.md index c945802dc3..0699ce5516 100644 --- a/docs/client_guide.md +++ b/docs/client_guide.md @@ -54,6 +54,17 @@ After apply the patch and rebuild spark, add following configuration in spark co spark.dynamicAllocation.enabled true ``` +### Support Spark AQE + +To improve performance of AQE skew optimization, uniffle introduces the LOCAL_ORDER shuffle-data distribution mechanism +to filter the lots of data to reduce network bandwidth and shuffle-server local-disk pressure. + +It can be enabled by the following config + ```bash + # Default value is NORMAL, it will directly append to file when the memory data is flushed to external storage + spark.rss.client.shuffle.data.distribution.type LOCAL_ORDER + ``` + ### Deploy MapReduce Client Plugin 1. Add client jar to the classpath of each NodeManager, e.g., /share/hadoop/mapreduce/ @@ -91,6 +102,7 @@ These configurations are shared by all types of clients. |.rss.client.assignment.shuffle.nodes.max|-1|The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default| |.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`, `zstd` also can be used.| |.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3| +|.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x| Notice: 1. `` should be `spark` or `mapreduce` diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java index 2ea3d5c0dc..f4791df92e 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java @@ -42,6 +42,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -272,8 +273,14 @@ private void registerShuffleServer(String testAppId, shuffleServerInfo2, shuffleServerInfo3, shuffleServerInfo4); for (int i = 0; i < replica; i++) { - shuffleWriteClientImpl.registerShuffle(allServers.get(i), - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo("")); + shuffleWriteClientImpl.registerShuffle( + allServers.get(i), + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java index 8c0a678c56..ca0afadcda 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java @@ -39,9 +39,10 @@ import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter; +import org.apache.uniffle.common.segment.SegmentSplitter; import org.apache.uniffle.common.util.ChecksumUtils; import org.apache.uniffle.common.util.Constants; -import org.apache.uniffle.common.util.RssUtils; public abstract class ShuffleReadWriteBase extends IntegrationTestBase { @@ -131,8 +132,7 @@ public static List readShuffleIndexSegments( RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult(); - return RssUtils.transIndexDataToSegments(shuffleIndexResult, readBufferSize); - + return new FixedSizeSegmentSplitter(readBufferSize).split(shuffleIndexResult); } public static ShuffleDataResult readShuffleData( @@ -167,28 +167,51 @@ public static ShuffleDataResult readShuffleData( int partitionNumPerRange, int partitionNum, int readBufferSize, - int segmentIndex) { - // read index file + int segmentIndex, + SegmentSplitter segmentSplitter) { RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult(); if (shuffleIndexResult == null) { return new ShuffleDataResult(); } - List sds = RssUtils.transIndexDataToSegments(shuffleIndexResult, readBufferSize); + List sds = segmentSplitter.split(shuffleIndexResult); if (segmentIndex >= sds.size()) { return new ShuffleDataResult(); } - // read shuffle data ShuffleDataSegment segment = sds.get(segmentIndex); RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, segment.getOffset(), segment.getLength()); + // read shuffle data return new ShuffleDataResult( shuffleServerClient.getShuffleData(rgsdr).getShuffleData(), segment.getBufferSegments()); } + + public static ShuffleDataResult readShuffleData( + ShuffleServerGrpcClient shuffleServerClient, + String appId, + int shuffleId, + int partitionId, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + int segmentIndex) { + // read index file + return readShuffleData( + shuffleServerClient, + appId, + shuffleId, + partitionId, + partitionNumPerRange, + partitionNum, + readBufferSize, + segmentIndex, + new FixedSizeSegmentSplitter(readBufferSize) + ); + } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index be51772af5..9b3d8488e9 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -54,6 +54,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.util.Constants; @@ -112,7 +113,10 @@ public void clearResourceTest() throws Exception { new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001), "clearResourceTest1", 0, - Lists.newArrayList(new PartitionRange(0, 1)), new RemoteStorageInfo("")); + Lists.newArrayList(new PartitionRange(0, 1)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L); shuffleWriteClient.sendAppHeartbeat("clearResourceTest2", 1000L); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java index 88c5438ffd..ed2bec66a9 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java @@ -46,6 +46,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.coordinator.CoordinatorServer; import org.apache.uniffle.server.ShuffleServer; @@ -178,7 +179,8 @@ public void hdfsWriteReadTest() throws Exception { 0, Lists.newArrayList(new PartitionRange(0, 1)), remoteStorageInfo, - user + user, + ShuffleDataDistributionType.NORMAL ); shuffleServerClient.registerShuffle(rrsr); @@ -187,7 +189,8 @@ public void hdfsWriteReadTest() throws Exception { 0, Lists.newArrayList(new PartitionRange(2, 3)), remoteStorageInfo, - user + user, + ShuffleDataDistributionType.NORMAL ); shuffleServerClient.registerShuffle(rrsr); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java new file mode 100644 index 0000000000..06a37b2ace --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; +import org.apache.uniffle.client.request.RssFinishShuffleRequest; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.client.request.RssSendCommitRequest; +import org.apache.uniffle.client.request.RssSendShuffleDataRequest; +import org.apache.uniffle.client.util.DefaultIdHelper; +import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataResult; +import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter; +import org.apache.uniffle.common.util.ChecksumUtils; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.util.StorageType; + +import static org.apache.uniffle.common.ShuffleDataDistributionType.LOCAL_ORDER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * This class is to test the local_order shuffle-data distribution + */ +public class ShuffleServerWithLocalOfLocalOrderTest extends ShuffleReadWriteBase { + + private ShuffleServerGrpcClient shuffleServerClient; + + @BeforeAll + public static void setupServers() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + File tmpDir = Files.createTempDir(); + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000"); + createShuffleServer(shuffleServerConf); + startServers(); + } + + @BeforeEach + public void createClient() { + shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT); + } + + @AfterEach + public void closeClient() { + shuffleServerClient.close(); + } + + public static Map>> createTestDataWithMultiMapIdx( + Roaring64NavigableMap[] bitmaps, + Map expectedData) { + for (int i = 0; i < 4; i++) { + bitmaps[i] = Roaring64NavigableMap.bitmapOf(); + } + + // key: mapIdx + Map> p0 = new HashMap<>(); + List blocks1 = createShuffleBlockList( + 0, 0, 0, 3, 25, bitmaps[0], expectedData, mockSSI); + List blocks2 = createShuffleBlockList( + 0, 0, 1, 3, 25, bitmaps[0], expectedData, mockSSI); + List blocks3 = createShuffleBlockList( + 0, 0, 2, 3, 25, bitmaps[0], expectedData, mockSSI); + p0.put(0, blocks1); + p0.put(1, blocks2); + p0.put(2, blocks3); + + final List blocks4 = createShuffleBlockList( + 0, 1, 1, 5, 25, bitmaps[1], expectedData, mockSSI); + final Map> p1 = new HashMap<>(); + p1.put(1, blocks4); + + final List blocks5 = createShuffleBlockList( + 0, 2, 2, 4, 25, bitmaps[2], expectedData, mockSSI); + final Map> p2 = new HashMap<>(); + p2.put(2, blocks5); + + final List blocks6 = createShuffleBlockList( + 0, 3, 3, 1, 25, bitmaps[3], expectedData, mockSSI); + final Map> p3 = new HashMap<>(); + p1.put(3, blocks6); + + Map>> partitionToBlocks = Maps.newHashMap(); + partitionToBlocks.put(0, p0); + partitionToBlocks.put(1, p1); + partitionToBlocks.put(2, p2); + partitionToBlocks.put(3, p3); + return partitionToBlocks; + } + + @Test + public void testWriteAndReadWithSpecifiedMapRange() throws Exception { + String testAppId = "testWriteAndReadWithSpecifiedMapRange"; + + for (int i = 0; i < 4; i++) { + RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 0, + Lists.newArrayList(new PartitionRange(i, i)), new RemoteStorageInfo(""), "", LOCAL_ORDER); + shuffleServerClient.registerShuffle(rrsr); + } + + /** + * Write the data to shuffle-servers + */ + Map expectedData = Maps.newHashMap(); + Roaring64NavigableMap[] bitMaps = new Roaring64NavigableMap[4]; + + // Create the shuffle block with the mapIdx + Map>> partitionToBlocksWithMapIdx = + createTestDataWithMultiMapIdx(bitMaps, expectedData); + + Map> partitionToBlocks = partitionToBlocksWithMapIdx.entrySet() + .stream() + .map(x -> + Pair.of(x.getKey(), x.getValue().values().stream().flatMap(a -> a.stream()).collect(Collectors.toList())) + ) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + + Map>> shuffleToBlocks = Maps.newHashMap(); + shuffleToBlocks.put(0, partitionToBlocks); + + RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest( + testAppId, 3, 1000, shuffleToBlocks); + shuffleServerClient.sendShuffleData(rssdr); + + // Flush the data to file + RssSendCommitRequest rscr = new RssSendCommitRequest(testAppId, 0); + shuffleServerClient.sendCommit(rscr); + RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(testAppId, 0); + shuffleServerClient.finishShuffle(rfsr); + + /** + * Read the single partition data by specified [startMapIdx, endMapIdx) + */ + // case1: get the mapIdx range [0, 1) of partition0 + final Set expectedBlockIds1 = partitionToBlocksWithMapIdx.get(0).get(0).stream() + .map(x -> x.getBlockId()) + .collect(Collectors.toSet()); + final Map expectedData1 = expectedData.entrySet().stream() + .filter(x -> expectedBlockIds1.contains(x.getKey())) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); + + Roaring64NavigableMap taskIds = Roaring64NavigableMap.bitmapOf(0); + ShuffleDataResult sdr = readShuffleData( + shuffleServerClient, + testAppId, + 0, + 0, + 1, + 10, + 1000, + 0, + new LocalOrderSegmentSplitter(taskIds, 1000) + ); + validate( + sdr, + expectedBlockIds1, + expectedData1, + new HashSet<>(Arrays.asList(0L)) + ); + + // case2: get the mapIdx range [0, 2) of partition0 + final Set expectedBlockIds2 = partitionToBlocksWithMapIdx.get(0).get(1).stream() + .map(x -> x.getBlockId()) + .collect(Collectors.toSet()); + expectedBlockIds2.addAll(expectedBlockIds1); + final Map expectedData2 = expectedData.entrySet().stream() + .filter(x -> expectedBlockIds2.contains(x.getKey())) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); + taskIds = Roaring64NavigableMap.bitmapOf(0, 1); + sdr = readShuffleData( + shuffleServerClient, + testAppId, + 0, + 0, + 1, + 10, + 1000, + 0, + new LocalOrderSegmentSplitter(taskIds, 1000) + ); + validate( + sdr, + expectedBlockIds2, + expectedData2, + new HashSet<>(Arrays.asList(0L, 1L)) + ); + + // case2: get the mapIdx range [1, 3) of partition0 + final Set expectedBlockIds3 = partitionToBlocksWithMapIdx.get(0).get(1).stream() + .map(x -> x.getBlockId()) + .collect(Collectors.toSet()); + expectedBlockIds3.addAll( + partitionToBlocksWithMapIdx.get(0).get(2).stream() + .map(x -> x.getBlockId()) + .collect(Collectors.toSet()) + ); + expectedBlockIds2.addAll(expectedBlockIds1); + final Map expectedData3 = expectedData.entrySet().stream() + .filter(x -> expectedBlockIds3.contains(x.getKey())) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); + taskIds = Roaring64NavigableMap.bitmapOf(1, 2); + sdr = readShuffleData( + shuffleServerClient, + testAppId, + 0, + 0, + 1, + 10, + 1000, + 0, + new LocalOrderSegmentSplitter(taskIds, 1000) + ); + validate( + sdr, + expectedBlockIds3, + expectedData3, + new HashSet<>(Arrays.asList(1L, 2L)) + ); + + // case3: get the mapIdx range [0, Integer.MAX_VALUE) of partition0, it should always return all data + final Set expectedBlockIds4 = partitionToBlocks.get(0).stream() + .map(x -> x.getBlockId()) + .collect(Collectors.toSet()); + final Map expectedData4 = expectedData.entrySet().stream() + .filter(x -> expectedBlockIds4.contains(x.getKey())) + .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); + taskIds = Roaring64NavigableMap.bitmapOf(); + for (long blockId : expectedBlockIds4) { + taskIds.add(new DefaultIdHelper().getTaskAttemptId(blockId)); + } + sdr = readShuffleData( + shuffleServerClient, + testAppId, + 0, + 0, + 1, + 10, + 10000, + 0, + new LocalOrderSegmentSplitter(taskIds, 100000) + ); + validate( + sdr, + expectedBlockIds4, + expectedData4, + new HashSet<>(Arrays.asList(0L, 1L, 2L)) + ); + } + + private void validate(ShuffleDataResult sdr, Set expectedBlockIds, + Map expectedData, Set expectedTaskAttemptIds) { + byte[] buffer = sdr.getData(); + List bufferSegments = sdr.getBufferSegments(); + int matched = 0; + for (BufferSegment bs : bufferSegments) { + if (expectedBlockIds.contains(bs.getBlockId())) { + byte[] data = new byte[bs.getLength()]; + System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength()); + assertEquals(bs.getCrc(), ChecksumUtils.getCrc32(data)); + assertTrue(Arrays.equals(data, expectedData.get(bs.getBlockId()))); + assertTrue(expectedBlockIds.contains(bs.getBlockId())); + assertTrue(expectedTaskAttemptIds.contains(bs.getTaskAttemptId())); + matched++; + } else { + fail(); + } + } + assertEquals(expectedBlockIds.size(), matched); + } +} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java index bf07502ad9..3f1422722f 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java @@ -58,7 +58,7 @@ public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase { private ShuffleServerGrpcClient shuffleServerClient; @BeforeAll - public static void setupServers() throws Exception { + private static void setupServers() throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index 8df1992610..0c98537863 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -42,6 +42,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleAssignmentsInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.RetryUtils; @@ -104,8 +105,14 @@ public void closeClient() { @Test public void rpcFailTest() throws Exception { String testAppId = "rpcFailTest"; - shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1, - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo("")); + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo1, + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); Map expectedData = Maps.newHashMap(); Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); @@ -149,11 +156,23 @@ public void rpcFailTest() throws Exception { public void reportMultipleServerTest() throws Exception { String testAppId = "reportMultipleServerTest"; - shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1, - testAppId, 1, Lists.newArrayList(new PartitionRange(1, 1)), new RemoteStorageInfo("")); - - shuffleWriteClientImpl.registerShuffle(shuffleServerInfo2, - testAppId, 1, Lists.newArrayList(new PartitionRange(2, 2)), new RemoteStorageInfo("")); + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo1, + testAppId, + 1, + Lists.newArrayList(new PartitionRange(1, 1)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); + + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo2, + testAppId, + 1, + Lists.newArrayList(new PartitionRange(2, 2)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); Map> partitionToServers = Maps.newHashMap(); partitionToServers.putIfAbsent(1, Lists.newArrayList(shuffleServerInfo1)); @@ -212,10 +231,22 @@ public void reportMultipleServerTest() throws Exception { @Test public void writeReadTest() throws Exception { String testAppId = "writeReadTest"; - shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1, - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo("")); - shuffleWriteClientImpl.registerShuffle(shuffleServerInfo2, - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo("")); + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo1, + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo2, + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); Map expectedData = Maps.newHashMap(); Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); @@ -263,8 +294,14 @@ public void writeReadTest() throws Exception { @Test public void emptyTaskTest() { String testAppId = "emptyTaskTest"; - shuffleWriteClientImpl.registerShuffle(shuffleServerInfo1, - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), new RemoteStorageInfo("")); + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo1, + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); boolean commitResult = shuffleWriteClientImpl .sendCommit(Sets.newHashSet(shuffleServerInfo1), testAppId, 0, 2); assertTrue(commitResult); @@ -306,7 +343,13 @@ public void testRetryAssgin() throws Throwable { }); } shuffleWriteClientImpl.registerShuffle( - entry.getKey(), appId, 0, entry.getValue(), remoteStorage); + entry.getKey(), + appId, + 0, + entry.getValue(), + remoteStorage, + ShuffleDataDistributionType.NORMAL + ); }); return shuffleAssignments; }, heartbeatTimeout, maxTryTime); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java index 50e0c27e6d..576f1d71d9 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java @@ -44,7 +44,7 @@ public class AQESkewedJoinTest extends SparkIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { + private static void setupServers() throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java new file mode 100644 index 0000000000..331f589800 --- /dev/null +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithLocalOrderTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import org.apache.spark.SparkConf; +import org.apache.spark.shuffle.RssSparkConfig; +import org.junit.jupiter.api.BeforeAll; + +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.util.StorageType; + +public class AQESkewedJoinWithLocalOrderTest extends AQESkewedJoinTest { + + @BeforeAll + public static void setupServers() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + // Use the LOCALFILE storage type to ensure the data will be flushed by local_order mechanism + shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + createShuffleServer(shuffleServerConf); + startServers(); + } + + @Override + public void updateSparkConfCustomer(SparkConf sparkConf) { + sparkConf.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), "LOCALFILE"); + sparkConf.set("spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(), + ShuffleDataDistributionType.LOCAL_ORDER.name()); + } +} diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index f817b2ca41..3d54e50ae1 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -57,6 +57,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.exception.NotRetryException; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.RetryUtils; @@ -131,12 +132,14 @@ private ShuffleRegisterResponse doRegisterShuffle( int shuffleId, List partitionRanges, RemoteStorageInfo remoteStorageInfo, - String user) { + String user, + ShuffleDataDistributionType dataDistributionType) { ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder(); reqBuilder .setAppId(appId) .setShuffleId(shuffleId) .setUser(user) + .setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name())) .addAllPartitionRanges(toShufflePartitionRanges(partitionRanges)); RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder(); rsBuilder.setPath(remoteStorageInfo.getPath()); @@ -245,7 +248,8 @@ public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest requ request.getShuffleId(), request.getPartitionRanges(), request.getRemoteStorageInfo(), - request.getUser() + request.getUser(), + request.getDataDistributionType() ); RssRegisterShuffleResponse response; diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java index 249d7136e7..4365a4b104 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java @@ -23,6 +23,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; public class RssRegisterShuffleRequest { @@ -31,18 +32,21 @@ public class RssRegisterShuffleRequest { private List partitionRanges; private RemoteStorageInfo remoteStorageInfo; private String user; + private ShuffleDataDistributionType dataDistributionType; public RssRegisterShuffleRequest( String appId, int shuffleId, List partitionRanges, RemoteStorageInfo remoteStorageInfo, - String user) { + String user, + ShuffleDataDistributionType dataDistributionType) { this.appId = appId; this.shuffleId = shuffleId; this.partitionRanges = partitionRanges; this.remoteStorageInfo = remoteStorageInfo; this.user = user; + this.dataDistributionType = dataDistributionType; } public RssRegisterShuffleRequest( @@ -50,7 +54,13 @@ public RssRegisterShuffleRequest( int shuffleId, List partitionRanges, String remoteStoragePath) { - this(appId, shuffleId, partitionRanges, new RemoteStorageInfo(remoteStoragePath), StringUtils.EMPTY); + this(appId, + shuffleId, + partitionRanges, + new RemoteStorageInfo(remoteStoragePath), + StringUtils.EMPTY, + ShuffleDataDistributionType.NORMAL + ); } public String getAppId() { @@ -72,4 +82,8 @@ public RemoteStorageInfo getRemoteStorageInfo() { public String getUser() { return user; } + + public ShuffleDataDistributionType getDataDistributionType() { + return dataDistributionType; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 4555a194da..4a4077cfd9 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -167,6 +167,12 @@ message ShuffleRegisterRequest { repeated ShufflePartitionRange partitionRanges = 3; RemoteStorage remoteStorage = 4; string user = 5; + DataDistribution shuffleDataDistribution = 6; +} + +enum DataDistribution { + NORMAL = 0; + LOCAL_ORDER = 1; } message ShuffleUnregisterRequest { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 3c2a190a2b..45e329d137 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.util.ThreadUtils; @@ -359,4 +360,8 @@ public long getCreateTimeStamp() { return createTimeStamp; } } + + public ShuffleDataDistributionType getDataDistributionType(String appId) { + return shuffleServer.getShuffleTaskManager().getDataDistributionType(appId); + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 3980cc6571..6af12b7659 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import com.google.common.collect.Lists; @@ -35,6 +36,7 @@ import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; @@ -138,6 +140,14 @@ public void registerShuffle(ShuffleRegisterRequest req, String remoteStoragePath = req.getRemoteStorage().getPath(); String user = req.getUser(); + ShuffleDataDistributionType shuffleDataDistributionType = + ShuffleDataDistributionType.valueOf( + Optional + .ofNullable(req.getShuffleDataDistribution()) + .orElse(RssProtos.DataDistribution.NORMAL) + .name() + ); + Map remoteStorageConf = req .getRemoteStorage() .getRemoteStorageConfList() @@ -156,7 +166,8 @@ public void registerShuffle(ShuffleRegisterRequest req, shuffleId, partitionRanges, new RemoteStorageInfo(remoteStoragePath, remoteStorageConf), - user + user, + shuffleDataDistributionType ); reply = ShuffleRegisterResponse diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index 963a57b8e3..aeb008873d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -24,6 +24,8 @@ import com.google.common.collect.Maps; import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.apache.uniffle.common.ShuffleDataDistributionType; + /** * ShuffleTaskInfo contains the information of submitting the shuffle, * the information of the cache block, user and timestamp corresponding to the app @@ -42,12 +44,15 @@ public class ShuffleTaskInfo { private Map cachedBlockIds; private AtomicReference user; + private AtomicReference dataDistType; + public ShuffleTaskInfo() { this.currentTimes = System.currentTimeMillis(); this.commitCounts = Maps.newConcurrentMap(); this.commitLocks = Maps.newConcurrentMap(); this.cachedBlockIds = Maps.newConcurrentMap(); this.user = new AtomicReference<>(); + this.dataDistType = new AtomicReference<>(); } public Long getCurrentTimes() { @@ -77,4 +82,13 @@ public String getUser() { public void setUser(String user) { this.user.set(user); } + + public void setDataDistType( + ShuffleDataDistributionType dataDistType) { + this.dataDistType.set(dataDistType); + } + + public ShuffleDataDistributionType getDataDistType() { + return dataDistType.get(); + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 2ec3d6c967..504b791fa9 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -45,6 +45,7 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; @@ -134,14 +135,36 @@ public ShuffleTaskManager( thread.start(); } + /** + * Only for test + */ + @VisibleForTesting public StatusCode registerShuffle( String appId, int shuffleId, List partitionRanges, RemoteStorageInfo remoteStorageInfo, String user) { + return registerShuffle( + appId, + shuffleId, + partitionRanges, + remoteStorageInfo, + user, + ShuffleDataDistributionType.NORMAL + ); + } + + public StatusCode registerShuffle( + String appId, + int shuffleId, + List partitionRanges, + RemoteStorageInfo remoteStorageInfo, + String user, + ShuffleDataDistributionType dataDistType) { refreshAppId(appId); shuffleTaskInfos.get(appId).setUser(user); + shuffleTaskInfos.get(appId).setDataDistType(dataDistType); partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap()); for (PartitionRange partitionRange : partitionRanges) { shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd()); @@ -502,4 +525,8 @@ public void removeShuffleDataAsync(String appId, int shuffleId) { void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); } + + public ShuffleDataDistributionType getDataDistributionType(String appId) { + return shuffleTaskInfos.get(appId).getDataDistType(); + } } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java index b251113f61..724fc6bcb9 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; @@ -74,12 +75,16 @@ public synchronized ShuffleDataFlushEvent toFlushEvent( int shuffleId, int startPartition, int endPartition, - Supplier isValid) { + Supplier isValid, + ShuffleDataDistributionType dataDistributionType) { if (blocks.isEmpty()) { return null; } // buffer will be cleared, and new list must be created for async flush List spBlocks = new LinkedList<>(blocks); + if (dataDistributionType == ShuffleDataDistributionType.LOCAL_ORDER) { + spBlocks.sort((o1, o2) -> o1.getTaskAttemptId() - o2.getTaskAttemptId() > 0 ? 1 : -1); + } long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement(); final ShuffleDataFlushEvent event = new ShuffleDataFlushEvent( eventId, @@ -97,6 +102,18 @@ public synchronized ShuffleDataFlushEvent toFlushEvent( return event; } + /** + * Only for test + */ + public synchronized ShuffleDataFlushEvent toFlushEvent( + String appId, + int shuffleId, + int startPartition, + int endPartition, + Supplier isValid) { + return toFlushEvent(appId, shuffleId, startPartition, endPartition, isValid, ShuffleDataDistributionType.NORMAL); + } + public List getBlocks() { return blocks; } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 99ae9c6bc3..f87ae896ea 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -195,8 +195,14 @@ public synchronized void commitShuffleTask(String appId, int shuffleId) { protected void flushBuffer(ShuffleBuffer buffer, String appId, int shuffleId, int startPartition, int endPartition) { ShuffleDataFlushEvent event = - buffer.toFlushEvent(appId, shuffleId, startPartition, endPartition, - () -> bufferPool.containsKey(appId)); + buffer.toFlushEvent( + appId, + shuffleId, + startPartition, + endPartition, + () -> bufferPool.containsKey(appId), + shuffleFlushManager.getDataDistributionType(appId) + ); if (event != null) { updateShuffleSize(appId, shuffleId, -event.getSize()); inFlushSize.addAndGet(event.getSize()); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 5a869b4398..f557531f79 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -82,8 +82,7 @@ public void registerShuffleTest() throws Exception { conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); ShuffleServer shuffleServer = new ShuffleServer(conf); - ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, - shuffleServer.getShuffleFlushManager(), shuffleServer.getShuffleBufferManager(), null); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "registerTest1"; int shuffleId = 1; @@ -138,11 +137,7 @@ public void writeProcessTest() throws Exception { conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L); conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); ShuffleServer shuffleServer = new ShuffleServer(conf); - ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); - ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); - StorageManager storageManager = shuffleServer.getStorageManager(); - ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager( - conf, shuffleFlushManager, shuffleBufferManager, storageManager); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); shuffleTaskManager.registerShuffle( appId, shuffleId, @@ -184,6 +179,8 @@ public void writeProcessTest() throws Exception { assertEquals(1, bufferIds.size()); assertEquals(StatusCode.SUCCESS, sc); shuffleTaskManager.commitShuffle(appId, shuffleId); + + ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); assertEquals(1, shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality()); // flush for partition 1-1 @@ -279,11 +276,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception { ShuffleServer shuffleServer = new ShuffleServer(conf); - ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); - ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); - StorageManager storageManager = shuffleServer.getStorageManager(); - ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager( - conf, shuffleFlushManager, shuffleBufferManager, storageManager); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "removeShuffleDataTest1"; for (int i = 0; i < 4; i++) { @@ -312,6 +305,7 @@ public void removeShuffleDataWithHdfsTest() throws Exception { assertEquals(1, shuffleTaskManager.getAppIds().size()); + ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); RangeMap rangeMap = shuffleBufferManager.getBufferPool().get(appId).get(0); assertFalse(rangeMap.asMapOfRanges().isEmpty()); shuffleTaskManager.commitShuffle(appId, 0); @@ -351,12 +345,7 @@ public void removeShuffleDataWithLocalfileTest() throws Exception { path1.toAbsolutePath().toString() + "," + path2.toAbsolutePath().toString()); ShuffleServer shuffleServer = new ShuffleServer(conf); - - ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); - ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); - StorageManager storageManager = shuffleServer.getStorageManager(); - ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager( - conf, shuffleFlushManager, shuffleBufferManager, storageManager); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); String appId = "removeShuffleDataWithLocalfileTest"; @@ -411,11 +400,7 @@ public void clearTest() throws Exception { conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); ShuffleServer shuffleServer = new ShuffleServer(conf); - ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager(); - ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); - StorageManager storageManager = shuffleServer.getStorageManager(); - ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager, - shuffleBufferManager, storageManager); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); shuffleTaskManager.registerShuffle( "clearTest1", shuffleId, diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java index 50c91f5a4e..1bee2d9ab7 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java @@ -54,6 +54,8 @@ public class ShuffleBufferManagerTest extends BufferTestBase { private ShuffleBufferManager shuffleBufferManager; private ShuffleFlushManager mockShuffleFlushManager; + private ShuffleServer mockShuffleServer; + private ShuffleTaskManager mockShuffleTaskManager; private ShuffleServerConf conf; @BeforeEach @@ -68,6 +70,9 @@ public void setUp() { conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0); conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L); mockShuffleFlushManager = mock(ShuffleFlushManager.class); + mockShuffleServer = mock(ShuffleServer.class); + mockShuffleTaskManager = mock(ShuffleTaskManager.class); + when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager); shuffleBufferManager = new ShuffleBufferManager(conf, mockShuffleFlushManager); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 9dcb280c1e..dbe4ac3e2b 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -118,10 +118,12 @@ private ClientReadHandler getLocalfileClientReaderHandler(CreateShuffleReadHandl List shuffleServerClients = shuffleServerInfoList.stream().map( ssi -> ShuffleServerClientFactory.getInstance().getShuffleServerClient(ClientType.GRPC.name(), ssi)).collect( Collectors.toList()); - return new LocalFileQuorumClientReadHandler(request.getAppId(), request.getShuffleId(), request.getPartitionId(), + return new LocalFileQuorumClientReadHandler( + request.getAppId(), request.getShuffleId(), request.getPartitionId(), request.getIndexReadLimit(), request.getPartitionNumPerRange(), request.getPartitionNum(), request.getReadBufferSize(), request.getExpectBlockIds(), request.getProcessBlockIds(), - shuffleServerClients); + shuffleServerClients, request.getDistributionType(), request.getExpectTaskIds() + ); } private ClientReadHandler getHdfsClientReadHandler(CreateShuffleReadHandlerRequest request) { @@ -136,7 +138,10 @@ private ClientReadHandler getHdfsClientReadHandler(CreateShuffleReadHandlerReque request.getExpectBlockIds(), request.getProcessBlockIds(), request.getStorageBasePath(), - request.getHadoopConf()); + request.getHadoopConf(), + request.getDistributionType(), + request.getExpectTaskIds() + ); } public ShuffleDeleteHandler createShuffleDeleteHandler(CreateShuffleDeleteHandlerRequest request) { diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java index ce7a515f61..1cf7dfd479 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java @@ -24,10 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; -import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.common.segment.SegmentSplitterFactory; public abstract class DataSkippableReadHandler extends AbstractClientReadHandler { private static final Logger LOG = LoggerFactory.getLogger(DataSkippableReadHandler.class); @@ -38,19 +39,26 @@ public abstract class DataSkippableReadHandler extends AbstractClientReadHandler protected Roaring64NavigableMap expectBlockIds; protected Roaring64NavigableMap processBlockIds; + protected ShuffleDataDistributionType distributionType; + protected Roaring64NavigableMap expectTaskIds; + public DataSkippableReadHandler( String appId, int shuffleId, int partitionId, int readBufferSize, Roaring64NavigableMap expectBlockIds, - Roaring64NavigableMap processBlockIds) { + Roaring64NavigableMap processBlockIds, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; this.readBufferSize = readBufferSize; this.expectBlockIds = expectBlockIds; this.processBlockIds = processBlockIds; + this.distributionType = distributionType; + this.expectTaskIds = expectTaskIds; } protected abstract ShuffleIndexResult readShuffleIndex(); @@ -64,7 +72,11 @@ public ShuffleDataResult readShuffleData() { return null; } - shuffleDataSegments = RssUtils.transIndexDataToSegments(shuffleIndexResult, readBufferSize); + shuffleDataSegments = + SegmentSplitterFactory + .getInstance() + .get(distributionType, expectTaskIds, readBufferSize) + .split(shuffleIndexResult); } // We should skip unexpected and processed segments when handler is read diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java index 872672cc21..a92089603a 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; import org.apache.uniffle.common.util.Constants; @@ -53,6 +54,9 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler { private long readLength = 0L; private long readUncompressLength = 0L; + private ShuffleDataDistributionType distributionType; + private Roaring64NavigableMap expectTaskIds; + public HdfsClientReadHandler( String appId, int shuffleId, @@ -64,7 +68,9 @@ public HdfsClientReadHandler( Roaring64NavigableMap expectBlockIds, Roaring64NavigableMap processBlockIds, String storageBasePath, - Configuration hadoopConf) { + Configuration hadoopConf, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; @@ -76,6 +82,26 @@ public HdfsClientReadHandler( this.storageBasePath = storageBasePath; this.hadoopConf = hadoopConf; this.readHandlerIndex = 0; + this.distributionType = distributionType; + this.expectTaskIds = expectTaskIds; + } + + // Only for test + public HdfsClientReadHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + String storageBasePath, + Configuration hadoopConf) { + this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, partitionNum, readBufferSize, + expectBlockIds, processBlockIds, storageBasePath, hadoopConf, ShuffleDataDistributionType.NORMAL, + Roaring64NavigableMap.bitmapOf()); } protected void init(String fullShufflePath) { @@ -107,7 +133,8 @@ protected void init(String fullShufflePath) { try { HdfsShuffleReadHandler handler = new HdfsShuffleReadHandler( appId, shuffleId, partitionId, filePrefix, - readBufferSize, expectBlockIds, processBlockIds, hadoopConf); + readBufferSize, expectBlockIds, processBlockIds, hadoopConf, + distributionType, expectTaskIds); readHandlers.add(handler); } catch (Exception e) { LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java index af94723f25..810ff1bda5 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; @@ -51,13 +52,30 @@ public HdfsShuffleReadHandler( int readBufferSize, Roaring64NavigableMap expectBlockIds, Roaring64NavigableMap processBlockIds, - Configuration conf) throws Exception { - super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds); + Configuration conf, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) throws Exception { + super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds, + distributionType, expectTaskIds); this.filePrefix = filePrefix; this.indexReader = createHdfsReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf); this.dataReader = createHdfsReader(ShuffleStorageUtils.generateDataFileName(filePrefix), conf); } + // Only for test + public HdfsShuffleReadHandler( + String appId, + int shuffleId, + int partitionId, + String filePrefix, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + Configuration conf) throws Exception { + this(appId, shuffleId, partitionId, filePrefix, readBufferSize, expectBlockIds, + processBlockIds, conf, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf()); + } + @Override protected ShuffleIndexResult readShuffleIndex() { long start = System.currentTimeMillis(); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java index a5c42c29a1..a630cf1ba3 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java @@ -25,6 +25,7 @@ import org.apache.uniffle.client.request.RssGetShuffleDataRequest; import org.apache.uniffle.client.request.RssGetShuffleIndexRequest; import org.apache.uniffle.client.response.RssGetShuffleDataResponse; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; @@ -47,8 +48,13 @@ public class LocalFileClientReadHandler extends DataSkippableReadHandler { int readBufferSize, Roaring64NavigableMap expectBlockIds, Roaring64NavigableMap processBlockIds, - ShuffleServerClient shuffleServerClient) { - super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds); + ShuffleServerClient shuffleServerClient, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) { + super( + appId, shuffleId, partitionId, readBufferSize, expectBlockIds, + processBlockIds, distributionType, expectTaskIds + ); this.shuffleServerClient = shuffleServerClient; this.partitionNumPerRange = partitionNumPerRange; this.partitionNum = partitionNum; diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java index 8cf1fe2ae3..d523cd6306 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java @@ -26,6 +26,7 @@ import org.apache.uniffle.client.api.ShuffleServerClient; import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.exception.RssException; @@ -49,7 +50,9 @@ public LocalFileQuorumClientReadHandler( int readBufferSize, Roaring64NavigableMap expectBlockIds, Roaring64NavigableMap processBlockIds, - List shuffleServerClients) { + List shuffleServerClients, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; @@ -65,11 +68,34 @@ public LocalFileQuorumClientReadHandler( readBufferSize, expectBlockIds, processBlockIds, - client + client, + distributionType, + expectBlockIds )); } } + /** + * Only for test + */ + public LocalFileQuorumClientReadHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + List shuffleServerClients) { + this( + appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, + partitionNum, readBufferSize, expectBlockIds, processBlockIds, + shuffleServerClients, ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf() + ); + } + @Override public ShuffleDataResult readShuffleData() { boolean readSuccessful = false; diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java index 046089c9bd..75a1f14654 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java +++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssBaseConf; @@ -41,6 +42,8 @@ public class CreateShuffleReadHandlerRequest { private List shuffleServerInfoList; private Roaring64NavigableMap expectBlockIds; private Roaring64NavigableMap processBlockIds; + private ShuffleDataDistributionType distributionType; + private Roaring64NavigableMap expectTaskIds; public CreateShuffleReadHandlerRequest() { } @@ -156,4 +159,20 @@ public void setProcessBlockIds(Roaring64NavigableMap processBlockIds) { public Roaring64NavigableMap getProcessBlockIds() { return processBlockIds; } + + public ShuffleDataDistributionType getDistributionType() { + return distributionType; + } + + public void setDistributionType(ShuffleDataDistributionType distributionType) { + this.distributionType = distributionType; + } + + public Roaring64NavigableMap getExpectTaskIds() { + return expectTaskIds; + } + + public void setExpectTaskIds(Roaring64NavigableMap expectTaskIds) { + this.expectTaskIds = expectTaskIds; + } } diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java index ae7f054177..5c1b5294ff 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java @@ -32,8 +32,8 @@ import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleIndexResult; import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter; import org.apache.uniffle.common.util.ChecksumUtils; -import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.storage.common.FileBasedShuffleSegment; import org.apache.uniffle.storage.handler.api.ServerReadHandler; import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; @@ -97,8 +97,7 @@ public static List readData(ServerReadHandler readHandler, Sh return shuffleDataResults; } - List shuffleDataSegments = - RssUtils.transIndexDataToSegments(shuffleIndexResult, 32); + List shuffleDataSegments = new FixedSizeSegmentSplitter(32).split(shuffleIndexResult); for (ShuffleDataSegment shuffleDataSegment : shuffleDataSegments) { byte[] shuffleData = diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java index d25f44f4e0..e2dcfd5bcc 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java @@ -33,6 +33,7 @@ import org.apache.uniffle.client.response.ResponseStatusCode; import org.apache.uniffle.client.response.RssGetShuffleDataResponse; import org.apache.uniffle.client.response.RssGetShuffleIndexResponse; +import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.storage.common.FileBasedShuffleSegment; @@ -95,7 +96,8 @@ public void testDataInconsistent() throws Exception { Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf(); LocalFileClientReadHandler handler = new LocalFileClientReadHandler(appId, partitionId, shuffleId, -1, 1, 1, - readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient); + readBufferSize, expectBlockIds, processBlockIds, mockShuffleServerClient, + ShuffleDataDistributionType.NORMAL, Roaring64NavigableMap.bitmapOf()); int totalSegment = ((blockSize * actualWriteDataBlock) / bytesPerSegment) + 1; int readBlocks = 0; for (int i = 0; i < totalSegment; i++) {