diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java new file mode 100644 index 0000000000..e32a52fcc7 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.client.TestUtils; +import org.apache.uniffle.client.impl.ShuffleReadClientImpl; +import org.apache.uniffle.client.impl.ShuffleWriteClientImpl; +import org.apache.uniffle.client.response.SendShuffleDataResult; +import org.apache.uniffle.client.util.DefaultIdHelper; +import org.apache.uniffle.common.ClientType; +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.util.StorageType; + +import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This class is to test the {@link org.apache.uniffle.server.storage.local.ChainableLocalStorageSelector} multiple + * disk selection strategy for reader. + */ +public class ShuffleClientWithLocalMultiDiskTest extends ShuffleReadWriteBase { + + private static ShuffleServerInfo shuffleServerInfo; + private ShuffleWriteClientImpl shuffleWriteClientImpl; + private ShuffleReadClientImpl shuffleReadClientImpl; + + @BeforeAll + public static void setupShuffleServers() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000000); + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + System.out.println("base: " + basePath); + shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); + shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 10 * 1024 * 1024); + shuffleServerConf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); + createShuffleServer(shuffleServerConf); + startServers(); + shuffleServerInfo = + new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT); + } + + @BeforeEach + public void createClient() { + shuffleWriteClientImpl = new ShuffleWriteClientImpl( + ClientType.GRPC.name(), 3, 1000, 1, + 1, 1, 1, true, 1, 1, 10, 10); + } + + @AfterEach + public void closeClient() { + shuffleWriteClientImpl.close(); + } + + @Test + public void testClientRemoteReadFromMultipleDisk() throws Exception { + String appId = "testClientRemoteReadFromMultipleDisk_appId"; + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo, + appId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); + + Map expectedData = Maps.newHashMap(); + Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); + + // First committing, blocks will be written to one disk + List blocks = createShuffleBlockList( + 0, 0, 0, 3, 25, blockIdBitmap, + expectedData, Lists.newArrayList(shuffleServerInfo)); + SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false); + assertEquals(0, result.getFailedBlockIds().size()); + assertEquals(3, result.getSuccessBlockIds().size()); + + boolean commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1); + assertTrue(commitResult); + + // Mark one storage reaching high watermark, it should switch another storage for next writing + ShuffleServer shuffleServer = shuffleServers.get(0); + ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 0); + LocalStorage storage1 = (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + storage1.getMetaData().setSize(20 * 1024 * 1024); + + blocks = createShuffleBlockList( + 0, 0, 0, 3, 25, blockIdBitmap, + expectedData, Lists.newArrayList(shuffleServerInfo)); + result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false); + assertEquals(0, result.getFailedBlockIds().size()); + assertEquals(3, result.getSuccessBlockIds().size()); + commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1); + assertTrue(commitResult); + + readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 1); + LocalStorage storage2 = (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + assertNotEquals(storage1, storage2); + + /** + * String storageType, + * String appId, + * int shuffleId, + * int partitionId, + * int indexReadLimit, + * int partitionNumPerRange, + * int partitionNum, + * int readBufferSize, + * String storageBasePath, + * Roaring64NavigableMap blockIdBitmap, + * Roaring64NavigableMap taskIdBitmap, + * List shuffleServerInfoList, + * Configuration hadoopConf, + * IdHelper idHelper) { + */ + ShuffleReadClientImpl readClient = new ShuffleReadClientImpl( + StorageType.LOCALFILE.name(), + appId, + 0, + 0, + 100, + 1, + 1, + 1000, + "", + blockIdBitmap, + Roaring64NavigableMap.bitmapOf(0), + Lists.newArrayList(shuffleServerInfo), + new Configuration(), + new DefaultIdHelper() + ); + + TestUtils.validateResult(readClient, expectedData); + readClient.checkProcessedBlockIds(); + readClient.close(); + } +} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java index ca0afadcda..f6caa9f291 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java @@ -130,7 +130,7 @@ public static List readShuffleIndexSegments( int readBufferSize) { // read index file RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, 0); ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult(); return new FixedSizeSegmentSplitter(readBufferSize).split(shuffleIndexResult); } @@ -152,7 +152,7 @@ public static ShuffleDataResult readShuffleData( ShuffleDataSegment segment = sds.get(segmentIndex); RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, - segment.getOffset(), segment.getLength()); + segment.getOffset(), segment.getLength(), 0); return new ShuffleDataResult( shuffleServerClient.getShuffleData(rgsdr).getShuffleData(), @@ -170,7 +170,7 @@ public static ShuffleDataResult readShuffleData( int segmentIndex, SegmentSplitter segmentSplitter) { RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, 0); ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult(); if (shuffleIndexResult == null) { return new ShuffleDataResult(); @@ -184,7 +184,7 @@ public static ShuffleDataResult readShuffleData( ShuffleDataSegment segment = sds.get(segmentIndex); RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, - segment.getOffset(), segment.getLength()); + segment.getOffset(), segment.getLength(), 0); // read shuffle data return new ShuffleDataResult( diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index fdb83a9fed..77aaa6c52b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -621,7 +621,7 @@ public void rpcMetricsTest() throws Exception { ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD).get(); try { shuffleServerClient.getShuffleIndex(new RssGetShuffleIndexRequest( - appId, shuffleId, 1, 1, 3)); + appId, shuffleId, 1, 1, 3, 0)); } catch (Exception e) { // ignore the exception, just test metrics value } @@ -637,7 +637,7 @@ public void rpcMetricsTest() throws Exception { try { shuffleServerClient.getShuffleData(new RssGetShuffleDataRequest( appId, shuffleId, 0, 1, 3, - 0, 100)); + 0, 100, 0)); } catch (Exception e) { // ignore the exception, just test metrics value } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 74fe2767d3..410b180030 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -537,6 +537,7 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request .setOffset(request.getOffset()) .setLength(request.getLength()) .setTimestamp(start) + .setStorageId(request.getStorageId()) .build(); GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest); String requestInfo = "appId[" + request.getAppId() + "], shuffleId[" @@ -571,6 +572,7 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ .setPartitionId(request.getPartitionId()) .setPartitionNumPerRange(request.getPartitionNumPerRange()) .setPartitionNum(request.getPartitionNum()) + .setStorageId(request.getStorageId()) .build(); long start = System.currentTimeMillis(); GetLocalShuffleIndexResponse rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java index 073ed79e44..81a5ec5ead 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java @@ -26,9 +26,10 @@ public class RssGetShuffleDataRequest { private final int partitionNum; private final long offset; private final int length; + private final int storageId; public RssGetShuffleDataRequest(String appId, int shuffleId, int partitionId, int partitionNumPerRange, - int partitionNum, long offset, int length) { + int partitionNum, long offset, int length, int storageId) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; @@ -36,6 +37,7 @@ public RssGetShuffleDataRequest(String appId, int shuffleId, int partitionId, in this.partitionNum = partitionNum; this.offset = offset; this.length = length; + this.storageId = storageId; } public String getAppId() { @@ -65,4 +67,8 @@ public long getOffset() { public int getLength() { return length; } + + public int getStorageId() { + return storageId; + } } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java index 6d4fa451d1..d66faa4371 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java @@ -24,18 +24,21 @@ public class RssGetShuffleIndexRequest { private final int partitionId; private final int partitionNumPerRange; private final int partitionNum; + private final int storageId; public RssGetShuffleIndexRequest( String appId, int shuffleId, int partitionId, int partitionNumPerRange, - int partitionNum) { + int partitionNum, + int storageId) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; this.partitionNumPerRange = partitionNumPerRange; this.partitionNum = partitionNum; + this.storageId = storageId; } public String getAppId() { @@ -58,4 +61,7 @@ public int getPartitionNum() { return partitionNum; } + public int getStorageId() { + return storageId; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index db3c6a07fb..1abcfacf01 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -77,6 +77,7 @@ message GetLocalShuffleDataRequest { int64 offset = 6; int32 length = 7; int64 timestamp = 8; + int32 storageId = 9; } message GetLocalShuffleDataResponse { @@ -108,6 +109,7 @@ message GetLocalShuffleIndexRequest { int32 partitionId = 3; int32 partitionNumPerRange = 4; int32 partitionNum = 5; + int32 storageId = 6; } message GetLocalShuffleIndexResponse { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java index 0f1804efae..fdab707840 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java @@ -17,13 +17,30 @@ package org.apache.uniffle.server; +import com.google.common.annotations.VisibleForTesting; + public class ShuffleDataReadEvent { private String appId; private int shuffleId; private int partitionId; private int startPartition; + private int storageIndex = 0; + public ShuffleDataReadEvent( + String appId, + int shuffleId, + int partitionId, + int startPartitionOfRange, + int storageIndex) { + this.appId = appId; + this.shuffleId = shuffleId; + this.partitionId = partitionId; + this.startPartition = startPartitionOfRange; + this.storageIndex = storageIndex; + } + + @VisibleForTesting public ShuffleDataReadEvent(String appId, int shuffleId, int partitionId, int startPartitionOfRange) { this.appId = appId; this.shuffleId = shuffleId; @@ -46,4 +63,8 @@ public int getPartitionId() { public int getStartPartition() { return startPartition; } + + public int getStorageIndex() { + return storageIndex; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 384545986c..9967ecda0d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -321,6 +321,12 @@ public class ShuffleServerConf extends RssBaseConf { .withDescription("The max concurrency of single partition writer, the data partition file number is " + "equal to this value. Default value is 1."); + public static final ConfigOption RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE = ConfigOptions + .key("rss.server.localstorage.multiple.disk.selection.enable") + .booleanType() + .defaultValue(false) + .withDescription(""); + public ShuffleServerConf() { } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 054ffe550b..d1ab3df4f7 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -517,7 +517,7 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request, Storage storage = shuffleServer .getStorageManager() .selectStorage( - new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]) + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], request.getStorageId()) ); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); @@ -526,8 +526,11 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request, if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) { try { long start = System.currentTimeMillis(); - sdr = shuffleServer.getShuffleTaskManager().getShuffleData(appId, shuffleId, partitionId, - partitionNumPerRange, partitionNum, storageType, offset, length); + sdr = shuffleServer.getShuffleTaskManager().getShuffleData( + appId, shuffleId, partitionId, + partitionNumPerRange, partitionNum, storageType, offset, length, + request.getStorageId() + ); long readTime = System.currentTimeMillis() - start; ShuffleServerMetrics.counterTotalReadTime.inc(readTime); ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length); @@ -573,6 +576,8 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, int partitionId = request.getPartitionId(); int partitionNumPerRange = request.getPartitionNumPerRange(); int partitionNum = request.getPartitionNum(); + int storageId = request.getStorageId(); + StatusCode status = StatusCode.SUCCESS; String msg = "OK"; GetLocalShuffleIndexResponse reply; @@ -581,7 +586,7 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = shuffleServer.getStorageManager() - .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId)); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); } @@ -593,7 +598,7 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, try { long start = System.currentTimeMillis(); ShuffleIndexResult shuffleIndexResult = shuffleServer.getShuffleTaskManager().getShuffleIndex( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, storageId); long readTime = System.currentTimeMillis() - start; byte[] data = shuffleIndexResult.getIndexData(); @@ -608,9 +613,11 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, builder.setIndexData(UnsafeByteOperations.unsafeWrap(data)); builder.setDataFileLen(shuffleIndexResult.getDataFileLen()); reply = builder.build(); - } catch (FileNotFoundException indexFileNotFoundException) { - LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.", - requestInfo, indexFileNotFoundException); + } catch (FileNotFoundException | IndexOutOfBoundsException exception) { + if (exception instanceof FileNotFoundException) { + LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.", + requestInfo, exception); + } reply = GetLocalShuffleIndexResponse.newBuilder() .setStatus(valueOf(status)) .build(); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 5e74eabf1e..10c37404f8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -391,7 +391,7 @@ public ShuffleDataResult getInMemoryShuffleData( public ShuffleDataResult getShuffleData( String appId, Integer shuffleId, Integer partitionId, int partitionNumPerRange, - int partitionNum, String storageType, long offset, int length) { + int partitionNum, String storageType, long offset, int length, int storageId) { refreshAppId(appId); CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); @@ -403,20 +403,23 @@ public ShuffleDataResult getShuffleData( request.setStorageType(storageType); request.setRssBaseConf(conf); int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); - Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + Storage storage = storageManager.selectStorage( + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId) + ); if (storage == null) { throw new FileNotFoundException("No such data stored in current storage manager."); } return storage.getOrCreateReadHandler(request).getShuffleData(offset, length); } - + public ShuffleIndexResult getShuffleIndex( String appId, Integer shuffleId, Integer partitionId, int partitionNumPerRange, - int partitionNum) { + int partitionNum, + int storageId) { refreshAppId(appId); String storageType = conf.getString(RssBaseConf.RSS_STORAGE_TYPE); CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); @@ -428,7 +431,9 @@ public ShuffleIndexResult getShuffleIndex( request.setStorageType(storageType); request.setRssBaseConf(conf); int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); - Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + Storage storage = storageManager.selectStorage( + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId) + ); if (storage == null) { throw new FileNotFoundException("No such data in current storage manager."); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 091a32d917..cd1976679d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -22,9 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -32,11 +30,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -44,7 +40,6 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.RemoteStorageInfo; -import org.apache.uniffle.common.UnionKey; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.server.Checker; import org.apache.uniffle.server.LocalStorageChecker; @@ -55,6 +50,8 @@ import org.apache.uniffle.server.event.AppPurgeEvent; import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.event.ShufflePurgeEvent; +import org.apache.uniffle.server.storage.local.AbstractCacheableStorageSelector; +import org.apache.uniffle.server.storage.local.ChainableLocalStorageSelector; import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.factory.ShuffleHandlerFactory; @@ -73,7 +70,7 @@ public class LocalStorageManager extends SingleStorageManager { private final List storageBasePaths; private final LocalStorageChecker checker; - private final Map partitionsOfStorage; + private final AbstractCacheableStorageSelector selector; @VisibleForTesting LocalStorageManager(ShuffleServerConf conf) { @@ -82,7 +79,6 @@ public class LocalStorageManager extends SingleStorageManager { if (CollectionUtils.isEmpty(storageBasePaths)) { throw new IllegalArgumentException("Base path dirs must not be empty"); } - this.partitionsOfStorage = Maps.newConcurrentMap(); long shuffleExpiredTimeoutMs = conf.get(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS); long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY); double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE); @@ -129,7 +125,7 @@ public class LocalStorageManager extends SingleStorageManager { if (failedCount > maxFailedNumber || successCount.get() == 0) { throw new RuntimeException( String.format("Initialize %s local storage(s) failed, " - + "specified local storage paths size: %s, the conf of %s size: %s", + + "specified local storage paths size: %s, the conf of %s size: %s", failedCount, localStorageArray.length, LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(), maxFailedNumber) ); } @@ -139,59 +135,17 @@ public class LocalStorageManager extends SingleStorageManager { StringUtils.join(localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())) ); this.checker = new LocalStorageChecker(conf, localStorages); + this.selector = new ChainableLocalStorageSelector(conf, localStorages); } @Override public Storage selectStorage(ShuffleDataFlushEvent event) { - String appId = event.getAppId(); - int shuffleId = event.getShuffleId(); - int partitionId = event.getStartPartition(); - - LocalStorage storage = partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId)); - if (storage != null) { - if (storage.isCorrupted()) { - if (storage.containsWriteHandler(appId, shuffleId, partitionId)) { - LOG.error("LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost", - storage.getBasePath(), event); - } - } else { - return storage; - } - } - - List candidates = localStorages - .stream() - .filter(x -> x.canWrite() && !x.isCorrupted()) - .collect(Collectors.toList()); - final LocalStorage selectedStorage = candidates.get( - ShuffleStorageUtils.getStorageIndex( - candidates.size(), - appId, - shuffleId, - partitionId - ) - ); - return partitionsOfStorage.compute( - UnionKey.buildKey(appId, shuffleId, partitionId), - (key, localStorage) -> { - // If this is the first time to select storage or existing storage is corrupted, - // we should refresh the cache. - if (localStorage == null || localStorage.isCorrupted()) { - event.setUnderStorage(selectedStorage); - return selectedStorage; - } - return localStorage; - }); + return selector.selectForWriter(event); } @Override public Storage selectStorage(ShuffleDataReadEvent event) { - String appId = event.getAppId(); - int shuffleId = event.getShuffleId(); - int partitionId = event.getStartPartition(); - - LocalStorage storage = partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId)); - return storage; + return selector.getForReader(event); } @Override @@ -212,7 +166,7 @@ public void removeResources(PurgeEvent event) { List shuffleSet = Optional.ofNullable(event.getShuffleIds()).orElse(Collections.emptyList()); // Remove partitions to storage mapping cache - cleanupStorageSelectionCache(event); + selector.removeCache(event); for (LocalStorage storage : localStorages) { if (event instanceof AppPurgeEvent) { @@ -245,37 +199,6 @@ public void removeResources(PurgeEvent event) { deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); } - private void cleanupStorageSelectionCache(PurgeEvent event) { - Function deleteConditionFunc = null; - if (event instanceof AppPurgeEvent) { - deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); - } else if (event instanceof ShufflePurgeEvent) { - deleteConditionFunc = - partitionUnionKey -> UnionKey.startsWith( - partitionUnionKey, - event.getAppId(), - event.getShuffleIds() - ); - } - long startTime = System.currentTimeMillis(); - deleteElement( - partitionsOfStorage, - deleteConditionFunc - ); - LOG.info("Cleaning the storage selection cache costs: {}(ms) for event: {}", - System.currentTimeMillis() - startTime, event); - } - - private void deleteElement(Map map, Function deleteConditionFunc) { - Iterator> iterator = map.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (deleteConditionFunc.apply(entry.getKey())) { - iterator.remove(); - } - } - } - @Override public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageInfo) { // ignore @@ -295,7 +218,7 @@ public void checkAndClearLeakedShuffleData(Collection appIds) { if (!appIds.contains(appId)) { ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance() .createShuffleDeleteHandler( - new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration())); + new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration())); String[] deletePaths = new String[storageBasePaths.size()]; for (int i = 0; i < storageBasePaths.size(); i++) { deletePaths[i] = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths.get(i), appId); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java new file mode 100644 index 0000000000..dc16ac89fe --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.storage.local; + +import org.apache.uniffle.server.event.PurgeEvent; +import org.apache.uniffle.storage.common.Storage; + +public abstract class AbstractCacheableStorageSelector implements StorageSelector { + + public abstract void removeCache(PurgeEvent event); + +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java new file mode 100644 index 0000000000..ca9b2b75d4 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.storage.local; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.UnionKey; +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.event.AppPurgeEvent; +import org.apache.uniffle.server.event.PurgeEvent; +import org.apache.uniffle.server.event.ShufflePurgeEvent; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.common.Storage; +import org.apache.uniffle.storage.util.ShuffleStorageUtils; + +import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE; + +public class ChainableLocalStorageSelector extends AbstractCacheableStorageSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(ChainableLocalStorageSelector.class); + + private final List localStorages; + private final Map viewOfPartitions; + private final boolean multipleDiskSelectionEnable; + + public ChainableLocalStorageSelector(ShuffleServerConf shuffleServerConf, List localStorages) { + this.localStorages = localStorages; + this.viewOfPartitions = Maps.newConcurrentMap(); + this.multipleDiskSelectionEnable = shuffleServerConf.get(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE); + } + + @Override + public Storage selectForWriter(ShuffleDataFlushEvent event) { + String appId = event.getAppId(); + int shuffleId = event.getShuffleId(); + int partitionId = event.getStartPartition(); + + String cacheKey = UnionKey.buildKey( + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + ); + ChainableLocalStorageView view = viewOfPartitions.get(cacheKey); + LocalStorage lastStorage = null; + if (view != null) { + lastStorage = view.get(); + if (lastStorage.isCorrupted()) { + if (lastStorage.containsWriteHandler(appId, shuffleId, partitionId)) { + LOGGER.error("LocalStorage: {} is corrupted. Switching another storage for event: {}, " + + "some data will be lost", lastStorage.getBasePath(), event); + } + } else { + if (!multipleDiskSelectionEnable || lastStorage.canWrite()) { + return lastStorage; + } + } + } + + // todo: support pluggable selection policy, hash-based or free-space based + List candidates = localStorages + .stream() + .filter(x -> x.canWrite() && !x.isCorrupted()) + .collect(Collectors.toList()); + if (candidates.isEmpty()) { + throw new RuntimeException("No available local storages."); + } + final LocalStorage selected = candidates.get( + ShuffleStorageUtils.getStorageIndex( + candidates.size(), + appId, + shuffleId, + partitionId + ) + ); + + final LocalStorage previousStorage = lastStorage; + viewOfPartitions.compute( + cacheKey, + (key, storageView) -> { + if (storageView == null) { + return new ChainableLocalStorageView(selected); + } + // If the storage is corrupted, it should be removed from the storage view. + if (previousStorage != null && previousStorage.isCorrupted()) { + storageView.remove(previousStorage); + } + storageView.switchTo(selected); + return storageView; + } + ); + event.setUnderStorage(selected); + return selected; + } + + @Override + public Storage getForReader(ShuffleDataReadEvent event) { + try { + ChainableLocalStorageView view = viewOfPartitions.get( + UnionKey.buildKey( + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + ) + ); + if (view == null) { + return null; + } + return view.get(event.getStorageIndex()); + } catch (IndexOutOfBoundsException exception) { + LOGGER.error("No such local storage for event: " + event); + return null; + } + } + + @Override + public void removeCache(PurgeEvent event) { + Function deleteConditionFunc = null; + if (event instanceof AppPurgeEvent) { + deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); + } else if (event instanceof ShufflePurgeEvent) { + deleteConditionFunc = + partitionUnionKey -> UnionKey.startsWith( + partitionUnionKey, + event.getAppId(), + event.getShuffleIds() + ); + } + deleteElement( + viewOfPartitions, + deleteConditionFunc + ); + } + + private void deleteElement(Map map, Function deleteConditionFunc) { + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (deleteConditionFunc.apply(entry.getKey())) { + iterator.remove(); + } + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java new file mode 100644 index 0000000000..84613199c9 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.storage.local; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.uniffle.storage.common.LocalStorage; + +/** + * This class is to wrap multiple local storages into single view. + */ +public class ChainableLocalStorageView { + + private final List localStorages; + + public ChainableLocalStorageView(LocalStorage localStorage) { + this.localStorages = new ArrayList<>(); + localStorages.add(localStorage); + } + + public void switchTo(LocalStorage localStorage) { + localStorages.add(localStorage); + } + + public LocalStorage get() { + return localStorages.get(localStorages.size() - 1); + } + + public LocalStorage get(int index) { + return localStorages.get(index); + } + + public void remove(LocalStorage localStorage) { + localStorages.remove(localStorage); + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java new file mode 100644 index 0000000000..a77ba16469 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server.storage.local; + +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.storage.common.Storage; + +public interface StorageSelector { + + T selectForWriter(ShuffleDataFlushEvent event); + + T getForReader(ShuffleDataReadEvent event); +} diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index be42a2154d..24aaa998c8 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -35,9 +35,11 @@ import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; +import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -71,6 +73,59 @@ private ShuffleDataFlushEvent toDataFlushEvent(String appId, int shuffleId, int ); } + @Test + public void testDynamicStorageSelection() { + String[] storagePaths = { + "/tmp/rss-data1", + "/tmp/rss-data2", + "/tmp/rss-data3" + }; + + ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); + conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); + conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); + conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); + + LocalStorageManager localStorageManager = new LocalStorageManager(conf); + + String appId = "testDynamicStorageSelection"; + ShuffleDataFlushEvent dataFlushEvent = toDataFlushEvent(appId, 1, 1); + + /** + * case1: normal selection + */ + Storage storage1 = localStorageManager.selectStorage(dataFlushEvent); + Storage storage2 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(storage1, storage2); + + /** + * case2: when one storage can't write, it will choose another storage + */ + // mark its storage full + ((LocalStorage)storage1).getMetaData().setSize(1024); + Storage storage3 = localStorageManager.selectStorage(dataFlushEvent); + assertNotEquals(storage1, storage3); + + /** + * case3: when all storages can't write, it will directly throw exception to make event fail. + */ + ((LocalStorage)storage3).getMetaData().setSize(1024); + Storage storage4 = localStorageManager.selectStorage(dataFlushEvent); + assertNotEquals(storage1, storage4); + assertNotEquals(storage3, storage4); + // This data flush event's view has 3 local storages. + localStorageManager.getStorages().stream().forEach(localStorage -> { + localStorage.getMetaData().setSize(1024); + }); + try { + Storage storage5 = localStorageManager.selectStorage(dataFlushEvent); + fail(); + } catch (Exception e) { + // ignore + } + } + @Test public void testStorageSelectionWhenReachingHighWatermark() { String[] storagePaths = { @@ -80,6 +135,7 @@ public void testStorageSelectionWhenReachingHighWatermark() { }; ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); @@ -89,12 +145,21 @@ public void testStorageSelectionWhenReachingHighWatermark() { String appId = "testStorageSelectionWhenReachingHighWatermark"; ShuffleDataFlushEvent dataFlushEvent = toDataFlushEvent(appId, 1, 1); Storage storage1 = localStorageManager.selectStorage(dataFlushEvent); - - ((LocalStorage) storage1).getMetaData().setSize(999); - localStorageManager = new LocalStorageManager(conf); Storage storage2 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(storage1, storage2); - assertNotEquals(storage1, storage2); + ((LocalStorage)storage1).getMetaData().setSize(999); + Storage storage3 = localStorageManager.selectStorage(dataFlushEvent); + assertNotEquals(storage1, storage3); + + // Select storage for read event + ShuffleDataReadEvent dataReadEvent1 = new ShuffleDataReadEvent(appId, 1, 1, 1, 0); + assertEquals(storage1, localStorageManager.selectStorage(dataReadEvent1)); + ShuffleDataReadEvent dataReadEvent2 = new ShuffleDataReadEvent(appId, 1, 1, 1, 1); + assertEquals(storage3, localStorageManager.selectStorage(dataReadEvent2)); + // if selecting storage for out-of-range storage id, it will return null + ShuffleDataReadEvent dataReadEvent3 = new ShuffleDataReadEvent(appId, 1, 1, 1, 2); + assertNull(localStorageManager.selectStorage(dataReadEvent3)); } @Test @@ -127,9 +192,12 @@ public void testStorageSelection() { // case2: one storage is corrupted, and it will switch to other storage at the first time of writing // event of (appId, shuffleId, startPartition) - ((LocalStorage)storage1).markCorrupted(); + markCorrupted(storage1); Storage storage4 = localStorageManager.selectStorage(dataFlushEvent1); - assertNotEquals(storage4.getStoragePath(), storage1.getStoragePath()); + assertNotEquals( + storage4.getStoragePath(), + storage1.getStoragePath() + ); assertEquals(localStorageManager.selectStorage(dataReadEvent), storage4); // case3: one storage is corrupted when it happened after the original event has been written, @@ -151,6 +219,10 @@ public void testStorageSelection() { assertEquals(storage7, storage8); } + private void markCorrupted(Storage storage) { + ((LocalStorage)storage).markCorrupted(); + } + @Test public void testInitLocalStorageManager() { String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"}; diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 912534823a..7cb997c908 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -36,7 +36,7 @@ import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler; import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler; import org.apache.uniffle.storage.handler.impl.HdfsShuffleDeleteHandler; -import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler; +import org.apache.uniffle.storage.handler.impl.LocalFileClientReadMultiFileHandler; import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler; import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler; import org.apache.uniffle.storage.handler.impl.MultiReplicaClientReadHandler; @@ -140,7 +140,7 @@ private ClientReadHandler getLocalfileClientReaderHandler(CreateShuffleReadHandl ShuffleServerInfo ssi) { ShuffleServerClient shuffleServerClient = ShuffleServerClientFactory.getInstance().getShuffleServerClient( ClientType.GRPC.name(), ssi); - return new LocalFileClientReadHandler( + return new LocalFileClientReadMultiFileHandler( request.getAppId(), request.getShuffleId(), request.getPartitionId(), request.getIndexReadLimit(), request.getPartitionNumPerRange(), request.getPartitionNum(), request.getReadBufferSize(), request.getExpectBlockIds(), request.getProcessBlockIds(), diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java index a2d9694021..89900bc324 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java @@ -37,6 +37,31 @@ public class LocalFileClientReadHandler extends DataSkippableReadHandler { private final int partitionNumPerRange; private final int partitionNum; private ShuffleServerClient shuffleServerClient; + private int storageIndex; + + public LocalFileClientReadHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + ShuffleServerClient shuffleServerClient, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds, + int storageIndex) { + super( + appId, shuffleId, partitionId, readBufferSize, expectBlockIds, + processBlockIds, distributionType, expectTaskIds + ); + this.shuffleServerClient = shuffleServerClient; + this.partitionNumPerRange = partitionNumPerRange; + this.partitionNum = partitionNum; + this.storageIndex = storageIndex; + } public LocalFileClientReadHandler( String appId, @@ -58,6 +83,7 @@ public LocalFileClientReadHandler( this.shuffleServerClient = shuffleServerClient; this.partitionNumPerRange = partitionNumPerRange; this.partitionNum = partitionNum; + this.storageIndex = 0; } /** @@ -85,7 +111,7 @@ public LocalFileClientReadHandler( public ShuffleIndexResult readShuffleIndex() { ShuffleIndexResult shuffleIndexResult = null; RssGetShuffleIndexRequest request = new RssGetShuffleIndexRequest( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, storageIndex); try { shuffleIndexResult = shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult(); } catch (Exception e) { @@ -106,7 +132,7 @@ public ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegment) } RssGetShuffleDataRequest request = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, - shuffleDataSegment.getOffset(), expectedLength); + shuffleDataSegment.getOffset(), expectedLength, storageIndex); try { RssGetShuffleDataResponse response = shuffleServerClient.getShuffleData(request); result = new ShuffleDataResult(response.getShuffleData(), shuffleDataSegment.getBufferSegments()); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java new file mode 100644 index 0000000000..b57f7bd753 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.client.api.ShuffleServerClient; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleDataResult; + +public class LocalFileClientReadMultiFileHandler extends AbstractClientReadHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(LocalFileClientReadMultiFileHandler.class); + + private int storageIndex = 0; + private LocalFileClientReadHandler readHandler; + private int currentHandlerReadCount = 0; + + private String appId; + private int shuffleId; + private int partitionId; + private int indexReadLimit; + private int partitionNumPerRange; + private int partitionNum; + private int readBufferSize; + private Roaring64NavigableMap expectBlockIds; + private Roaring64NavigableMap processBlockIds; + private ShuffleServerClient shuffleServerClient; + private ShuffleDataDistributionType distributionType; + private Roaring64NavigableMap expectTaskIds; + + public LocalFileClientReadMultiFileHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + ShuffleServerClient shuffleServerClient, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) { + this.appId = appId; + this.shuffleId = shuffleId; + this.partitionId = partitionId; + this.indexReadLimit = indexReadLimit; + this.partitionNumPerRange = partitionNumPerRange; + this.partitionNum = partitionNum; + this.readBufferSize = readBufferSize; + this.expectBlockIds = expectBlockIds; + this.processBlockIds = processBlockIds; + this.shuffleServerClient = shuffleServerClient; + this.distributionType = distributionType; + this.expectTaskIds = expectTaskIds; + } + + @Override + public ShuffleDataResult readShuffleData() { + /** + * step1: read shuffle index from storage-{i} + * step2: read shuffle data from storage-{i} + * step3: if shuffle data is null, then read next index from storage-{i+1}. + * - if empty, return. + * - else, return step1 + */ + + if (readHandler == null) { + LOGGER.info("Initializing local file client read handler with storage index: {}", storageIndex); + this.readHandler = initHandler(storageIndex); + } + + ShuffleDataResult shuffleDataResult = readHandler.readShuffleData(); + + if (shuffleDataResult == null && currentHandlerReadCount == 0) { + return null; + } + + if (shuffleDataResult == null) { + readHandler.close(); + storageIndex += 1; + this.readHandler = initHandler(storageIndex); + shuffleDataResult = readHandler.readShuffleData(); + if (shuffleDataResult == null) { + return null; + } + LOGGER.info("Switched next local file client read handler with storage index: {}", storageIndex); + } + + currentHandlerReadCount++; + return shuffleDataResult; + } + + @Override + public void close() { + if (readHandler != null) { + readHandler.close(); + } + } + + private LocalFileClientReadHandler initHandler(int storageIndex) { + LocalFileClientReadHandler handler = new LocalFileClientReadHandler( + appId, + shuffleId, + partitionId, + indexReadLimit, + partitionNumPerRange, + partitionNum, + readBufferSize, + expectBlockIds, + processBlockIds, + shuffleServerClient, + distributionType, + expectTaskIds, + storageIndex + ); + return handler; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java index 0801893a79..a7c2cf47a3 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java +++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java @@ -46,7 +46,6 @@ public class CreateShuffleReadHandlerRequest { private ShuffleDataDistributionType distributionType; private Roaring64NavigableMap expectTaskIds; private boolean expectedTaskIdsBitmapFilterEnable; - private IdHelper idHelper; public CreateShuffleReadHandlerRequest() {