From 6955490cc5d7d1c9889bf811759863f86cfdaf2a Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 16 Dec 2022 14:21:24 +0800 Subject: [PATCH 1/9] LocalStorageSwitch --- .../uniffle/common/config/RssBaseConf.java | 6 + .../uniffle/test/ShuffleReadWriteBase.java | 8 +- .../uniffle/test/ShuffleServerGrpcTest.java | 4 +- .../impl/grpc/ShuffleServerGrpcClient.java | 2 + .../request/RssGetShuffleDataRequest.java | 8 +- .../request/RssGetShuffleIndexRequest.java | 8 +- proto/src/main/proto/Rss.proto | 2 + .../uniffle/server/ShuffleDataReadEvent.java | 16 +++ .../server/ShuffleServerGrpcService.java | 17 ++- .../uniffle/server/ShuffleTaskManager.java | 9 +- .../server/storage/LocalStorageManager.java | 97 ++----------- .../AbstractCacheableStorageSelector.java | 10 ++ .../local/ChainableLocalStorageSelector.java | 136 ++++++++++++++++++ .../storage/local/LocalStorageView.java | 32 +++++ .../server/storage/local/StorageSelector.java | 12 ++ .../storage/LocalStorageManagerTest.java | 71 ++++++++- .../storage/common/ChainableLocalStorage.java | 129 +++++++++++++++++ .../factory/ShuffleHandlerFactory.java | 4 +- .../impl/LocalFileClientReadHandler.java | 29 +++- .../LocalFileClientReadMultiFileHandler.java | 136 ++++++++++++++++++ .../CreateShuffleReadHandlerRequest.java | 9 ++ 21 files changed, 634 insertions(+), 111 deletions(-) create mode 100644 server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java create mode 100644 server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java create mode 100644 server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java create mode 100644 server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java create mode 100644 storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java create mode 100644 storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java index 1d794fb3bc..8c4e28d9e9 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java @@ -147,6 +147,12 @@ public class RssBaseConf extends RssConf { .noDefaultValue() .withDescription("Common storage path for remote shuffle data"); + public static final ConfigOption RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE = ConfigOptions + .key("rss.storage.multiple.disk.selection.enable") + .booleanType() + .defaultValue(false) + .withDescription(""); + public static final ConfigOption RPC_EXECUTOR_SIZE = ConfigOptions .key("rss.rpc.executor.size") .intType() diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java index ca0afadcda..f6caa9f291 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java @@ -130,7 +130,7 @@ public static List readShuffleIndexSegments( int readBufferSize) { // read index file RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, 0); ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult(); return new FixedSizeSegmentSplitter(readBufferSize).split(shuffleIndexResult); } @@ -152,7 +152,7 @@ public static ShuffleDataResult readShuffleData( ShuffleDataSegment segment = sds.get(segmentIndex); RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, - segment.getOffset(), segment.getLength()); + segment.getOffset(), segment.getLength(), 0); return new ShuffleDataResult( shuffleServerClient.getShuffleData(rgsdr).getShuffleData(), @@ -170,7 +170,7 @@ public static ShuffleDataResult readShuffleData( int segmentIndex, SegmentSplitter segmentSplitter) { RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, 0); ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult(); if (shuffleIndexResult == null) { return new ShuffleDataResult(); @@ -184,7 +184,7 @@ public static ShuffleDataResult readShuffleData( ShuffleDataSegment segment = sds.get(segmentIndex); RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, - segment.getOffset(), segment.getLength()); + segment.getOffset(), segment.getLength(), 0); // read shuffle data return new ShuffleDataResult( diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index fdb83a9fed..77aaa6c52b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -621,7 +621,7 @@ public void rpcMetricsTest() throws Exception { ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD).get(); try { shuffleServerClient.getShuffleIndex(new RssGetShuffleIndexRequest( - appId, shuffleId, 1, 1, 3)); + appId, shuffleId, 1, 1, 3, 0)); } catch (Exception e) { // ignore the exception, just test metrics value } @@ -637,7 +637,7 @@ public void rpcMetricsTest() throws Exception { try { shuffleServerClient.getShuffleData(new RssGetShuffleDataRequest( appId, shuffleId, 0, 1, 3, - 0, 100)); + 0, 100, 0)); } catch (Exception e) { // ignore the exception, just test metrics value } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 74fe2767d3..410b180030 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -537,6 +537,7 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request .setOffset(request.getOffset()) .setLength(request.getLength()) .setTimestamp(start) + .setStorageId(request.getStorageId()) .build(); GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest); String requestInfo = "appId[" + request.getAppId() + "], shuffleId[" @@ -571,6 +572,7 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ .setPartitionId(request.getPartitionId()) .setPartitionNumPerRange(request.getPartitionNumPerRange()) .setPartitionNum(request.getPartitionNum()) + .setStorageId(request.getStorageId()) .build(); long start = System.currentTimeMillis(); GetLocalShuffleIndexResponse rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java index 073ed79e44..81a5ec5ead 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleDataRequest.java @@ -26,9 +26,10 @@ public class RssGetShuffleDataRequest { private final int partitionNum; private final long offset; private final int length; + private final int storageId; public RssGetShuffleDataRequest(String appId, int shuffleId, int partitionId, int partitionNumPerRange, - int partitionNum, long offset, int length) { + int partitionNum, long offset, int length, int storageId) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; @@ -36,6 +37,7 @@ public RssGetShuffleDataRequest(String appId, int shuffleId, int partitionId, in this.partitionNum = partitionNum; this.offset = offset; this.length = length; + this.storageId = storageId; } public String getAppId() { @@ -65,4 +67,8 @@ public long getOffset() { public int getLength() { return length; } + + public int getStorageId() { + return storageId; + } } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java index 6d4fa451d1..d66faa4371 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleIndexRequest.java @@ -24,18 +24,21 @@ public class RssGetShuffleIndexRequest { private final int partitionId; private final int partitionNumPerRange; private final int partitionNum; + private final int storageId; public RssGetShuffleIndexRequest( String appId, int shuffleId, int partitionId, int partitionNumPerRange, - int partitionNum) { + int partitionNum, + int storageId) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; this.partitionNumPerRange = partitionNumPerRange; this.partitionNum = partitionNum; + this.storageId = storageId; } public String getAppId() { @@ -58,4 +61,7 @@ public int getPartitionNum() { return partitionNum; } + public int getStorageId() { + return storageId; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index db3c6a07fb..1abcfacf01 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -77,6 +77,7 @@ message GetLocalShuffleDataRequest { int64 offset = 6; int32 length = 7; int64 timestamp = 8; + int32 storageId = 9; } message GetLocalShuffleDataResponse { @@ -108,6 +109,7 @@ message GetLocalShuffleIndexRequest { int32 partitionId = 3; int32 partitionNumPerRange = 4; int32 partitionNum = 5; + int32 storageId = 6; } message GetLocalShuffleIndexResponse { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java index 0f1804efae..55d8204522 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java @@ -17,13 +17,25 @@ package org.apache.uniffle.server; +import com.google.common.annotations.VisibleForTesting; + public class ShuffleDataReadEvent { private String appId; private int shuffleId; private int partitionId; private int startPartition; + private int storageIndex = 0; + public ShuffleDataReadEvent(String appId, int shuffleId, int partitionId, int startPartitionOfRange, int storageIndex) { + this.appId = appId; + this.shuffleId = shuffleId; + this.partitionId = partitionId; + this.startPartition = startPartitionOfRange; + this.storageIndex = storageIndex; + } + + @VisibleForTesting public ShuffleDataReadEvent(String appId, int shuffleId, int partitionId, int startPartitionOfRange) { this.appId = appId; this.shuffleId = shuffleId; @@ -46,4 +58,8 @@ public int getPartitionId() { public int getStartPartition() { return startPartition; } + + public int getStorageIndex() { + return storageIndex; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 054ffe550b..cf82ce4574 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -526,8 +526,11 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request, if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) { try { long start = System.currentTimeMillis(); - sdr = shuffleServer.getShuffleTaskManager().getShuffleData(appId, shuffleId, partitionId, - partitionNumPerRange, partitionNum, storageType, offset, length); + sdr = shuffleServer.getShuffleTaskManager().getShuffleData( + appId, shuffleId, partitionId, + partitionNumPerRange, partitionNum, storageType, offset, length, + request.getStorageId() + ); long readTime = System.currentTimeMillis() - start; ShuffleServerMetrics.counterTotalReadTime.inc(readTime); ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length); @@ -593,7 +596,7 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, try { long start = System.currentTimeMillis(); ShuffleIndexResult shuffleIndexResult = shuffleServer.getShuffleTaskManager().getShuffleIndex( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, request.getStorageId()); long readTime = System.currentTimeMillis() - start; byte[] data = shuffleIndexResult.getIndexData(); @@ -608,9 +611,11 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, builder.setIndexData(UnsafeByteOperations.unsafeWrap(data)); builder.setDataFileLen(shuffleIndexResult.getDataFileLen()); reply = builder.build(); - } catch (FileNotFoundException indexFileNotFoundException) { - LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.", - requestInfo, indexFileNotFoundException); + } catch (FileNotFoundException | IndexOutOfBoundsException exception) { + if (exception instanceof FileNotFoundException) { + LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.", + requestInfo, exception); + } reply = GetLocalShuffleIndexResponse.newBuilder() .setStatus(valueOf(status)) .build(); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 5e74eabf1e..f3c4e198d8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -391,7 +391,7 @@ public ShuffleDataResult getInMemoryShuffleData( public ShuffleDataResult getShuffleData( String appId, Integer shuffleId, Integer partitionId, int partitionNumPerRange, - int partitionNum, String storageType, long offset, int length) { + int partitionNum, String storageType, long offset, int length, int storageId) { refreshAppId(appId); CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); @@ -402,6 +402,7 @@ public ShuffleDataResult getShuffleData( request.setPartitionNum(partitionNum); request.setStorageType(storageType); request.setRssBaseConf(conf); + request.setStorageSeqIndex(storageId); int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); if (storage == null) { @@ -410,13 +411,14 @@ public ShuffleDataResult getShuffleData( return storage.getOrCreateReadHandler(request).getShuffleData(offset, length); } - + public ShuffleIndexResult getShuffleIndex( String appId, Integer shuffleId, Integer partitionId, int partitionNumPerRange, - int partitionNum) { + int partitionNum, + int storageId) { refreshAppId(appId); String storageType = conf.getString(RssBaseConf.RSS_STORAGE_TYPE); CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest(); @@ -427,6 +429,7 @@ public ShuffleIndexResult getShuffleIndex( request.setPartitionNum(partitionNum); request.setStorageType(storageType); request.setRssBaseConf(conf); + request.setStorageSeqIndex(storageId); int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); if (storage == null) { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 091a32d917..aefbeea050 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -22,9 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -32,11 +30,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -44,7 +40,6 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.RemoteStorageInfo; -import org.apache.uniffle.common.UnionKey; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.server.Checker; import org.apache.uniffle.server.LocalStorageChecker; @@ -55,6 +50,8 @@ import org.apache.uniffle.server.event.AppPurgeEvent; import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.event.ShufflePurgeEvent; +import org.apache.uniffle.server.storage.local.AbstractCacheableStorageSelector; +import org.apache.uniffle.server.storage.local.ChainableLocalStorageSelector; import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.factory.ShuffleHandlerFactory; @@ -73,7 +70,7 @@ public class LocalStorageManager extends SingleStorageManager { private final List storageBasePaths; private final LocalStorageChecker checker; - private final Map partitionsOfStorage; + private final AbstractCacheableStorageSelector selector; @VisibleForTesting LocalStorageManager(ShuffleServerConf conf) { @@ -82,7 +79,6 @@ public class LocalStorageManager extends SingleStorageManager { if (CollectionUtils.isEmpty(storageBasePaths)) { throw new IllegalArgumentException("Base path dirs must not be empty"); } - this.partitionsOfStorage = Maps.newConcurrentMap(); long shuffleExpiredTimeoutMs = conf.get(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS); long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY); double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE); @@ -129,7 +125,7 @@ public class LocalStorageManager extends SingleStorageManager { if (failedCount > maxFailedNumber || successCount.get() == 0) { throw new RuntimeException( String.format("Initialize %s local storage(s) failed, " - + "specified local storage paths size: %s, the conf of %s size: %s", + + "specified local storage paths size: %s, the conf of %s size: %s", failedCount, localStorageArray.length, LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(), maxFailedNumber) ); } @@ -139,59 +135,17 @@ public class LocalStorageManager extends SingleStorageManager { StringUtils.join(localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())) ); this.checker = new LocalStorageChecker(conf, localStorages); + this.selector = new ChainableLocalStorageSelector(localStorages); } @Override public Storage selectStorage(ShuffleDataFlushEvent event) { - String appId = event.getAppId(); - int shuffleId = event.getShuffleId(); - int partitionId = event.getStartPartition(); - - LocalStorage storage = partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId)); - if (storage != null) { - if (storage.isCorrupted()) { - if (storage.containsWriteHandler(appId, shuffleId, partitionId)) { - LOG.error("LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost", - storage.getBasePath(), event); - } - } else { - return storage; - } - } - - List candidates = localStorages - .stream() - .filter(x -> x.canWrite() && !x.isCorrupted()) - .collect(Collectors.toList()); - final LocalStorage selectedStorage = candidates.get( - ShuffleStorageUtils.getStorageIndex( - candidates.size(), - appId, - shuffleId, - partitionId - ) - ); - return partitionsOfStorage.compute( - UnionKey.buildKey(appId, shuffleId, partitionId), - (key, localStorage) -> { - // If this is the first time to select storage or existing storage is corrupted, - // we should refresh the cache. - if (localStorage == null || localStorage.isCorrupted()) { - event.setUnderStorage(selectedStorage); - return selectedStorage; - } - return localStorage; - }); + return selector.selectForWriter(event); } @Override public Storage selectStorage(ShuffleDataReadEvent event) { - String appId = event.getAppId(); - int shuffleId = event.getShuffleId(); - int partitionId = event.getStartPartition(); - - LocalStorage storage = partitionsOfStorage.get(UnionKey.buildKey(appId, shuffleId, partitionId)); - return storage; + return selector.getForReader(event); } @Override @@ -212,7 +166,7 @@ public void removeResources(PurgeEvent event) { List shuffleSet = Optional.ofNullable(event.getShuffleIds()).orElse(Collections.emptyList()); // Remove partitions to storage mapping cache - cleanupStorageSelectionCache(event); + selector.removeCache(event); for (LocalStorage storage : localStorages) { if (event instanceof AppPurgeEvent) { @@ -245,37 +199,6 @@ public void removeResources(PurgeEvent event) { deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); } - private void cleanupStorageSelectionCache(PurgeEvent event) { - Function deleteConditionFunc = null; - if (event instanceof AppPurgeEvent) { - deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); - } else if (event instanceof ShufflePurgeEvent) { - deleteConditionFunc = - partitionUnionKey -> UnionKey.startsWith( - partitionUnionKey, - event.getAppId(), - event.getShuffleIds() - ); - } - long startTime = System.currentTimeMillis(); - deleteElement( - partitionsOfStorage, - deleteConditionFunc - ); - LOG.info("Cleaning the storage selection cache costs: {}(ms) for event: {}", - System.currentTimeMillis() - startTime, event); - } - - private void deleteElement(Map map, Function deleteConditionFunc) { - Iterator> iterator = map.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (deleteConditionFunc.apply(entry.getKey())) { - iterator.remove(); - } - } - } - @Override public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageInfo) { // ignore @@ -295,7 +218,7 @@ public void checkAndClearLeakedShuffleData(Collection appIds) { if (!appIds.contains(appId)) { ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance() .createShuffleDeleteHandler( - new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration())); + new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration())); String[] deletePaths = new String[storageBasePaths.size()]; for (int i = 0; i < storageBasePaths.size(); i++) { deletePaths[i] = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths.get(i), appId); @@ -308,4 +231,4 @@ public void checkAndClearLeakedShuffleData(Collection appIds) { public List getStorages() { return localStorages; } -} +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java new file mode 100644 index 0000000000..2f828f0f93 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java @@ -0,0 +1,10 @@ +package org.apache.uniffle.server.storage.local; + +import org.apache.uniffle.server.event.PurgeEvent; +import org.apache.uniffle.storage.common.Storage; + +public abstract class AbstractCacheableStorageSelector implements StorageSelector { + + public abstract void removeCache(PurgeEvent event); + +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java new file mode 100644 index 0000000000..cf5615fcf0 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -0,0 +1,136 @@ +package org.apache.uniffle.server.storage.local; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.UnionKey; +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.event.AppPurgeEvent; +import org.apache.uniffle.server.event.PurgeEvent; +import org.apache.uniffle.server.event.ShufflePurgeEvent; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.common.Storage; +import org.apache.uniffle.storage.util.ShuffleStorageUtils; + +public class ChainableLocalStorageSelector extends AbstractCacheableStorageSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(ChainableLocalStorageSelector.class); + + private final List localStorages; + private final Map storageOfPartitions; + + public ChainableLocalStorageSelector(List localStorages) { + this.localStorages = localStorages; + this.storageOfPartitions = Maps.newConcurrentMap(); + } + + @Override + public Storage selectForWriter(ShuffleDataFlushEvent event) { + String appId = event.getAppId(); + int shuffleId = event.getShuffleId(); + int partitionId = event.getStartPartition(); + + LocalStorageView view = storageOfPartitions.get(getKey(event)); + LocalStorage lastStorage = null; + if (view != null) { + lastStorage = view.getLatest(); + if (lastStorage.isCorrupted()) { + if (lastStorage.containsWriteHandler(appId, shuffleId, partitionId)) { + LOGGER.error("LocalStorage: " + lastStorage.getBasePath() + " is corrupted."); + } + } else { + if (lastStorage.canWrite()) { + return lastStorage; + } + } + } + + List candidates = localStorages + .stream() + .filter(x -> x.canWrite() && !x.isCorrupted()) + .collect(Collectors.toList()); + LocalStorage localStorage = candidates.get( + ShuffleStorageUtils.getStorageIndex( + candidates.size(), + appId, + shuffleId, + partitionId + ) + ); + + LocalStorage finalLastStorage = lastStorage; + storageOfPartitions.compute( + getKey(event), + (key, storageView) -> { + if (storageView == null) { + return new LocalStorageView(localStorage); + } + if (finalLastStorage != null && finalLastStorage.isCorrupted() + && !finalLastStorage.containsWriteHandler(appId, shuffleId, partitionId)) { + storageView.removeTail(); + } + storageView.add(localStorage); + return storageView; + } + ); + event.setUnderStorage(localStorage); + return localStorage; + } + + private String getKey(ShuffleDataFlushEvent event) { + return UnionKey.buildKey( + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + ); + } + + @Override + public Storage getForReader(ShuffleDataReadEvent event) { + return storageOfPartitions.get(getKey(event)).get(event.getStorageIndex()); + } + + @Override + public void removeCache(PurgeEvent event) { + Function deleteConditionFunc = null; + if (event instanceof AppPurgeEvent) { + deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); + } else if (event instanceof ShufflePurgeEvent) { + deleteConditionFunc = + partitionUnionKey -> UnionKey.startsWith( + partitionUnionKey, + event.getAppId(), + event.getShuffleIds() + ); + } + deleteElement( + storageOfPartitions, + deleteConditionFunc + ); + } + + private void deleteElement(Map map, Function deleteConditionFunc) { + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (deleteConditionFunc.apply(entry.getKey())) { + iterator.remove(); + } + } + } + + private String getKey(ShuffleDataReadEvent event) { + return UnionKey.buildKey( + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + ); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java b/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java new file mode 100644 index 0000000000..dfadf0589d --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java @@ -0,0 +1,32 @@ +package org.apache.uniffle.server.storage.local; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.uniffle.storage.common.LocalStorage; + +public class LocalStorageView { + + private final List localStorages; + + public LocalStorageView(LocalStorage localStorage) { + this.localStorages = new ArrayList<>(); + localStorages.add(localStorage); + } + + public void add(LocalStorage localStorage) { + localStorages.add(localStorage); + } + + public LocalStorage getLatest() { + return localStorages.get(localStorages.size() - 1); + } + + public void removeTail() { + localStorages.remove(localStorages.size() - 1); + } + + public LocalStorage get(int index) { + return localStorages.get(index); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java new file mode 100644 index 0000000000..751bcd2563 --- /dev/null +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java @@ -0,0 +1,12 @@ +package org.apache.uniffle.server.storage.local; + +import org.apache.uniffle.server.ShuffleDataFlushEvent; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.storage.common.Storage; + +public interface StorageSelector { + + T selectForWriter(ShuffleDataFlushEvent event); + + T getForReader(ShuffleDataReadEvent event); +} \ No newline at end of file diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index be42a2154d..af1172125d 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -31,6 +31,7 @@ import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.storage.common.ChainableLocalStorage; import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; @@ -71,6 +72,55 @@ private ShuffleDataFlushEvent toDataFlushEvent(String appId, int shuffleId, int ); } + @Test + public void testDynamicStorageSelection() { + String[] storagePaths = { + "/tmp/rss-data1", + "/tmp/rss-data2", + "/tmp/rss-data3" + }; + + ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); + conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); + conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); + conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); + + LocalStorageManager localStorageManager = new LocalStorageManager(conf); + + String appId = "testDynamicStorageSelection"; + ShuffleDataFlushEvent dataFlushEvent = toDataFlushEvent(appId, 1, 1); + + /** + * case1: normal selection + */ + Storage storage1 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(((ChainableLocalStorage)storage1).getChainableStorages().size(), 1); + Storage storage2 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(((ChainableLocalStorage)storage2).getChainableStorages().size(), 1); + assertEquals(storage1, storage2); + + /** + * case2: when one storage can't write, it will choose another storage + */ + List localStorages = ((ChainableLocalStorage)storage1).getChainableStorages(); + // mark its storage full + localStorages.get(localStorages.size() - 1).getMetaData().setSize(1024); + Storage storage3 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(storage1, storage3); + assertEquals(((ChainableLocalStorage)storage3).getChainableStorages().size(), 2); + localStorages.get(localStorages.size() - 1).getMetaData().setSize(1024); + Storage storage4 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(((ChainableLocalStorage)storage4).getChainableStorages().size(), 3); + + /** + * case3: when all storages can't write, it will directly return the original tail storage + */ + localStorages.get(localStorages.size() - 1).getMetaData().setSize(1024); + Storage storage5 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(((ChainableLocalStorage)storage5).getChainableStorages().size(), 3); + } + @Test public void testStorageSelectionWhenReachingHighWatermark() { String[] storagePaths = { @@ -90,7 +140,8 @@ public void testStorageSelectionWhenReachingHighWatermark() { ShuffleDataFlushEvent dataFlushEvent = toDataFlushEvent(appId, 1, 1); Storage storage1 = localStorageManager.selectStorage(dataFlushEvent); - ((LocalStorage) storage1).getMetaData().setSize(999); + List localStorageList = ((ChainableLocalStorage)storage1).getChainableStorages(); + localStorageList.get(localStorageList.size() - 1).getMetaData().setSize(999); localStorageManager = new LocalStorageManager(conf); Storage storage2 = localStorageManager.selectStorage(dataFlushEvent); @@ -127,9 +178,13 @@ public void testStorageSelection() { // case2: one storage is corrupted, and it will switch to other storage at the first time of writing // event of (appId, shuffleId, startPartition) - ((LocalStorage)storage1).markCorrupted(); + LocalStorage localStorage1 = getLatestStorage(storage1); + markCorrupted(storage1); Storage storage4 = localStorageManager.selectStorage(dataFlushEvent1); - assertNotEquals(storage4.getStoragePath(), storage1.getStoragePath()); + assertNotEquals( + getLatestStorage(storage4).getStoragePath(), + localStorage1.getStoragePath() + ); assertEquals(localStorageManager.selectStorage(dataReadEvent), storage4); // case3: one storage is corrupted when it happened after the original event has been written, @@ -151,6 +206,16 @@ public void testStorageSelection() { assertEquals(storage7, storage8); } + private LocalStorage getLatestStorage(Storage storage) { + List localStorageList = ((ChainableLocalStorage)storage).getChainableStorages(); + return localStorageList.get(localStorageList.size() - 1); + } + + private void markCorrupted(Storage storage) { + List localStorageList = ((ChainableLocalStorage)storage).getChainableStorages(); + localStorageList.get(localStorageList.size() - 1).markCorrupted(); + } + @Test public void testInitLocalStorageManager() { String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"}; diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java new file mode 100644 index 0000000000..c7ec213e0b --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.common; + +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.storage.handler.api.ServerReadHandler; +import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; +import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest; +import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest; + +public class ChainableLocalStorage extends AbstractStorage { + private static final Logger LOGGER = LoggerFactory.getLogger(ChainableLocalStorage.class); + + private final List chainableStorages; + private int size; + + public ChainableLocalStorage(@Nonnull LocalStorage localStorage) { + this.chainableStorages = new ArrayList<>(); + chainableStorages.add(localStorage); + this.size = 1; + } + + public String getBasePath() { + return chainableStorages.get(size - 1).getBasePath(); + } + + public boolean isCorrupted() { + return chainableStorages.get(size - 1).isCorrupted(); + } + + @Override + ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest request) { + return null; + } + + @Override + public ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request) { + return chainableStorages.get(size - 1).getOrCreateWriteHandler(request); + } + + @Override + public ServerReadHandler getOrCreateReadHandler(CreateShuffleReadHandlerRequest request) { + int index = request.getStorageSeqIndex(); + return chainableStorages.get(index).getOrCreateReadHandler(request); + } + + @Override + protected ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request) { + return null; + } + + @Override + public boolean canWrite() { + return chainableStorages.get(size - 1).canWrite(); + } + + @Override + public boolean lockShuffleShared(String shuffleKey) { + return chainableStorages.get(size - 1).lockShuffleShared(shuffleKey); + } + + @Override + public boolean unlockShuffleShared(String shuffleKey) { + return chainableStorages.get(size - 1).unlockShuffleShared(shuffleKey); + } + + @Override + public void updateWriteMetrics(StorageWriteMetrics metrics) { + chainableStorages.get(size - 1).updateWriteMetrics(metrics); + } + + @Override + public void updateReadMetrics(StorageReadMetrics metrics) { + + } + + @Override + public void createMetadataIfNotExist(String shuffleKey) { + chainableStorages.get(size - 1).createMetadataIfNotExist(shuffleKey); + } + + @Override + public String getStoragePath() { + return chainableStorages.get(size - 1).getStoragePath(); + } + + @Override + public String getStorageHost() { + return chainableStorages.get(size - 1).getStorageHost(); + } + + public void removeTailStorage() { + this.chainableStorages.remove(size - 1); + this.size -= 1; + } + + public List getChainableStorages() { + return chainableStorages; + } + + public void switchTo(LocalStorage newLocalStorage) { + // If it's the used storage, it should be switched as the latest local storage to write. + if (chainableStorages.contains(newLocalStorage)) { + chainableStorages.remove(newLocalStorage); + } + chainableStorages.add(newLocalStorage); + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 912534823a..7cb997c908 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -36,7 +36,7 @@ import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler; import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler; import org.apache.uniffle.storage.handler.impl.HdfsShuffleDeleteHandler; -import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler; +import org.apache.uniffle.storage.handler.impl.LocalFileClientReadMultiFileHandler; import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler; import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler; import org.apache.uniffle.storage.handler.impl.MultiReplicaClientReadHandler; @@ -140,7 +140,7 @@ private ClientReadHandler getLocalfileClientReaderHandler(CreateShuffleReadHandl ShuffleServerInfo ssi) { ShuffleServerClient shuffleServerClient = ShuffleServerClientFactory.getInstance().getShuffleServerClient( ClientType.GRPC.name(), ssi); - return new LocalFileClientReadHandler( + return new LocalFileClientReadMultiFileHandler( request.getAppId(), request.getShuffleId(), request.getPartitionId(), request.getIndexReadLimit(), request.getPartitionNumPerRange(), request.getPartitionNum(), request.getReadBufferSize(), request.getExpectBlockIds(), request.getProcessBlockIds(), diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java index a2d9694021..09bba08543 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java @@ -37,6 +37,31 @@ public class LocalFileClientReadHandler extends DataSkippableReadHandler { private final int partitionNumPerRange; private final int partitionNum; private ShuffleServerClient shuffleServerClient; + private int storageIndex = 0; + + public LocalFileClientReadHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + ShuffleServerClient shuffleServerClient, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds, + int storageIndex) { + super( + appId, shuffleId, partitionId, readBufferSize, expectBlockIds, + processBlockIds, distributionType, expectTaskIds + ); + this.shuffleServerClient = shuffleServerClient; + this.partitionNumPerRange = partitionNumPerRange; + this.partitionNum = partitionNum; + this.storageIndex = storageIndex; + } public LocalFileClientReadHandler( String appId, @@ -85,7 +110,7 @@ public LocalFileClientReadHandler( public ShuffleIndexResult readShuffleIndex() { ShuffleIndexResult shuffleIndexResult = null; RssGetShuffleIndexRequest request = new RssGetShuffleIndexRequest( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, storageIndex); try { shuffleIndexResult = shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult(); } catch (Exception e) { @@ -106,7 +131,7 @@ public ShuffleDataResult readShuffleData(ShuffleDataSegment shuffleDataSegment) } RssGetShuffleDataRequest request = new RssGetShuffleDataRequest( appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, - shuffleDataSegment.getOffset(), expectedLength); + shuffleDataSegment.getOffset(), expectedLength, storageIndex); try { RssGetShuffleDataResponse response = shuffleServerClient.getShuffleData(request); result = new ShuffleDataResult(response.getShuffleData(), shuffleDataSegment.getBufferSegments()); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java new file mode 100644 index 0000000000..b57f7bd753 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadMultiFileHandler.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.client.api.ShuffleServerClient; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleDataResult; + +public class LocalFileClientReadMultiFileHandler extends AbstractClientReadHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(LocalFileClientReadMultiFileHandler.class); + + private int storageIndex = 0; + private LocalFileClientReadHandler readHandler; + private int currentHandlerReadCount = 0; + + private String appId; + private int shuffleId; + private int partitionId; + private int indexReadLimit; + private int partitionNumPerRange; + private int partitionNum; + private int readBufferSize; + private Roaring64NavigableMap expectBlockIds; + private Roaring64NavigableMap processBlockIds; + private ShuffleServerClient shuffleServerClient; + private ShuffleDataDistributionType distributionType; + private Roaring64NavigableMap expectTaskIds; + + public LocalFileClientReadMultiFileHandler( + String appId, + int shuffleId, + int partitionId, + int indexReadLimit, + int partitionNumPerRange, + int partitionNum, + int readBufferSize, + Roaring64NavigableMap expectBlockIds, + Roaring64NavigableMap processBlockIds, + ShuffleServerClient shuffleServerClient, + ShuffleDataDistributionType distributionType, + Roaring64NavigableMap expectTaskIds) { + this.appId = appId; + this.shuffleId = shuffleId; + this.partitionId = partitionId; + this.indexReadLimit = indexReadLimit; + this.partitionNumPerRange = partitionNumPerRange; + this.partitionNum = partitionNum; + this.readBufferSize = readBufferSize; + this.expectBlockIds = expectBlockIds; + this.processBlockIds = processBlockIds; + this.shuffleServerClient = shuffleServerClient; + this.distributionType = distributionType; + this.expectTaskIds = expectTaskIds; + } + + @Override + public ShuffleDataResult readShuffleData() { + /** + * step1: read shuffle index from storage-{i} + * step2: read shuffle data from storage-{i} + * step3: if shuffle data is null, then read next index from storage-{i+1}. + * - if empty, return. + * - else, return step1 + */ + + if (readHandler == null) { + LOGGER.info("Initializing local file client read handler with storage index: {}", storageIndex); + this.readHandler = initHandler(storageIndex); + } + + ShuffleDataResult shuffleDataResult = readHandler.readShuffleData(); + + if (shuffleDataResult == null && currentHandlerReadCount == 0) { + return null; + } + + if (shuffleDataResult == null) { + readHandler.close(); + storageIndex += 1; + this.readHandler = initHandler(storageIndex); + shuffleDataResult = readHandler.readShuffleData(); + if (shuffleDataResult == null) { + return null; + } + LOGGER.info("Switched next local file client read handler with storage index: {}", storageIndex); + } + + currentHandlerReadCount++; + return shuffleDataResult; + } + + @Override + public void close() { + if (readHandler != null) { + readHandler.close(); + } + } + + private LocalFileClientReadHandler initHandler(int storageIndex) { + LocalFileClientReadHandler handler = new LocalFileClientReadHandler( + appId, + shuffleId, + partitionId, + indexReadLimit, + partitionNumPerRange, + partitionNum, + readBufferSize, + expectBlockIds, + processBlockIds, + shuffleServerClient, + distributionType, + expectTaskIds, + storageIndex + ); + return handler; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java index 0801893a79..880b2cd5f8 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java +++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java @@ -46,6 +46,7 @@ public class CreateShuffleReadHandlerRequest { private ShuffleDataDistributionType distributionType; private Roaring64NavigableMap expectTaskIds; private boolean expectedTaskIdsBitmapFilterEnable; + private int storageSeqIndex; private IdHelper idHelper; @@ -195,4 +196,12 @@ public IdHelper getIdHelper() { public void setIdHelper(IdHelper idHelper) { this.idHelper = idHelper; } + + public int getStorageSeqIndex() { + return storageSeqIndex; + } + + public void setStorageSeqIndex(int storageSeqIndex) { + this.storageSeqIndex = storageSeqIndex; + } } From 6785e6ce5a4b6ba781017e6bd7f37bff7af62a15 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 12:50:20 +0800 Subject: [PATCH 2/9] disable multiple disk selection by default --- .../uniffle/common/config/RssBaseConf.java | 6 - .../uniffle/server/ShuffleServerConf.java | 6 + .../server/storage/LocalStorageManager.java | 2 +- .../local/ChainableLocalStorageSelector.java | 44 ++++-- .../storage/LocalStorageManagerTest.java | 72 ++++++---- .../storage/common/ChainableLocalStorage.java | 129 ------------------ 6 files changed, 80 insertions(+), 179 deletions(-) delete mode 100644 storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java index 8c4e28d9e9..1d794fb3bc 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java @@ -147,12 +147,6 @@ public class RssBaseConf extends RssConf { .noDefaultValue() .withDescription("Common storage path for remote shuffle data"); - public static final ConfigOption RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE = ConfigOptions - .key("rss.storage.multiple.disk.selection.enable") - .booleanType() - .defaultValue(false) - .withDescription(""); - public static final ConfigOption RPC_EXECUTOR_SIZE = ConfigOptions .key("rss.rpc.executor.size") .intType() diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 384545986c..9967ecda0d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -321,6 +321,12 @@ public class ShuffleServerConf extends RssBaseConf { .withDescription("The max concurrency of single partition writer, the data partition file number is " + "equal to this value. Default value is 1."); + public static final ConfigOption RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE = ConfigOptions + .key("rss.server.localstorage.multiple.disk.selection.enable") + .booleanType() + .defaultValue(false) + .withDescription(""); + public ShuffleServerConf() { } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index aefbeea050..4c81fc6580 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -135,7 +135,7 @@ public class LocalStorageManager extends SingleStorageManager { StringUtils.join(localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())) ); this.checker = new LocalStorageChecker(conf, localStorages); - this.selector = new ChainableLocalStorageSelector(localStorages); + this.selector = new ChainableLocalStorageSelector(conf, localStorages); } @Override diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java index cf5615fcf0..7e7b072a24 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -11,8 +11,10 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.UnionKey; +import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.event.AppPurgeEvent; import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.event.ShufflePurgeEvent; @@ -20,15 +22,19 @@ import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.ShuffleStorageUtils; +import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE; + public class ChainableLocalStorageSelector extends AbstractCacheableStorageSelector { private static final Logger LOGGER = LoggerFactory.getLogger(ChainableLocalStorageSelector.class); private final List localStorages; private final Map storageOfPartitions; + private final boolean multipleDiskSelectionEnable; - public ChainableLocalStorageSelector(List localStorages) { + public ChainableLocalStorageSelector(ShuffleServerConf shuffleServerConf, List localStorages) { this.localStorages = localStorages; this.storageOfPartitions = Maps.newConcurrentMap(); + this.multipleDiskSelectionEnable = shuffleServerConf.get(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE); } @Override @@ -43,20 +49,25 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { lastStorage = view.getLatest(); if (lastStorage.isCorrupted()) { if (lastStorage.containsWriteHandler(appId, shuffleId, partitionId)) { - LOGGER.error("LocalStorage: " + lastStorage.getBasePath() + " is corrupted."); + LOGGER.error("LocalStorage: {} is corrupted. Switching another storage for event: {}, " + + "some data will be lost", lastStorage.getBasePath(), event); } } else { - if (lastStorage.canWrite()) { + if (!multipleDiskSelectionEnable || lastStorage.canWrite()) { return lastStorage; } } } + // todo: support pluggable selection policy, hash-based or free-space based List candidates = localStorages .stream() .filter(x -> x.canWrite() && !x.isCorrupted()) .collect(Collectors.toList()); - LocalStorage localStorage = candidates.get( + if (candidates.isEmpty()) { + throw new RuntimeException("No available local storages."); + } + final LocalStorage selected = candidates.get( ShuffleStorageUtils.getStorageIndex( candidates.size(), appId, @@ -65,23 +76,26 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { ) ); - LocalStorage finalLastStorage = lastStorage; + final LocalStorage previousStorage = lastStorage; storageOfPartitions.compute( getKey(event), (key, storageView) -> { if (storageView == null) { - return new LocalStorageView(localStorage); + return new LocalStorageView(selected); } - if (finalLastStorage != null && finalLastStorage.isCorrupted() - && !finalLastStorage.containsWriteHandler(appId, shuffleId, partitionId)) { - storageView.removeTail(); + // If the storage is corrupted, it should be removed from the stoarge view. + if (previousStorage != null && previousStorage.isCorrupted()) { + LocalStorage currentTailStorage = storageView.getLatest(); + if (previousStorage == currentTailStorage) { + storageView.removeTail(); + } } - storageView.add(localStorage); + storageView.add(selected); return storageView; } ); - event.setUnderStorage(localStorage); - return localStorage; + event.setUnderStorage(selected); + return selected; } private String getKey(ShuffleDataFlushEvent event) { @@ -94,7 +108,11 @@ private String getKey(ShuffleDataFlushEvent event) { @Override public Storage getForReader(ShuffleDataReadEvent event) { - return storageOfPartitions.get(getKey(event)).get(event.getStorageIndex()); + try { + return storageOfPartitions.get(getKey(event)).get(event.getStorageIndex()); + } catch (IndexOutOfBoundsException exception) { + throw new FileNotFoundException("No such local storage for event: " + event); + } } @Override diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index af1172125d..2f9f952f1f 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -27,15 +27,16 @@ import org.junit.jupiter.api.Test; import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; -import org.apache.uniffle.storage.common.ChainableLocalStorage; import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; +import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -81,7 +82,7 @@ public void testDynamicStorageSelection() { }; ShuffleServerConf conf = new ShuffleServerConf(); - conf.set(ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); + conf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); @@ -95,30 +96,34 @@ public void testDynamicStorageSelection() { * case1: normal selection */ Storage storage1 = localStorageManager.selectStorage(dataFlushEvent); - assertEquals(((ChainableLocalStorage)storage1).getChainableStorages().size(), 1); Storage storage2 = localStorageManager.selectStorage(dataFlushEvent); - assertEquals(((ChainableLocalStorage)storage2).getChainableStorages().size(), 1); assertEquals(storage1, storage2); /** * case2: when one storage can't write, it will choose another storage */ - List localStorages = ((ChainableLocalStorage)storage1).getChainableStorages(); // mark its storage full - localStorages.get(localStorages.size() - 1).getMetaData().setSize(1024); + ((LocalStorage)storage1).getMetaData().setSize(1024); Storage storage3 = localStorageManager.selectStorage(dataFlushEvent); - assertEquals(storage1, storage3); - assertEquals(((ChainableLocalStorage)storage3).getChainableStorages().size(), 2); - localStorages.get(localStorages.size() - 1).getMetaData().setSize(1024); - Storage storage4 = localStorageManager.selectStorage(dataFlushEvent); - assertEquals(((ChainableLocalStorage)storage4).getChainableStorages().size(), 3); + assertNotEquals(storage1, storage3); /** - * case3: when all storages can't write, it will directly return the original tail storage + * case3: when all storages can't write, it will directly throw exception to make event fail. */ - localStorages.get(localStorages.size() - 1).getMetaData().setSize(1024); - Storage storage5 = localStorageManager.selectStorage(dataFlushEvent); - assertEquals(((ChainableLocalStorage)storage5).getChainableStorages().size(), 3); + ((LocalStorage)storage3).getMetaData().setSize(1024); + Storage storage4 = localStorageManager.selectStorage(dataFlushEvent); + assertNotEquals(storage1, storage4); + assertNotEquals(storage3, storage4); + // This data flush event's view has 3 local storages. + localStorageManager.getStorages().stream().forEach(localStorage -> { + localStorage.getMetaData().setSize(1024); + }); + try { + Storage storage5 = localStorageManager.selectStorage(dataFlushEvent); + fail(); + } catch (Exception e) { + // ignore + } } @Test @@ -130,6 +135,7 @@ public void testStorageSelectionWhenReachingHighWatermark() { }; ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); @@ -139,13 +145,26 @@ public void testStorageSelectionWhenReachingHighWatermark() { String appId = "testStorageSelectionWhenReachingHighWatermark"; ShuffleDataFlushEvent dataFlushEvent = toDataFlushEvent(appId, 1, 1); Storage storage1 = localStorageManager.selectStorage(dataFlushEvent); - - List localStorageList = ((ChainableLocalStorage)storage1).getChainableStorages(); - localStorageList.get(localStorageList.size() - 1).getMetaData().setSize(999); - localStorageManager = new LocalStorageManager(conf); Storage storage2 = localStorageManager.selectStorage(dataFlushEvent); + assertEquals(storage1, storage2); - assertNotEquals(storage1, storage2); + ((LocalStorage)storage1).getMetaData().setSize(999); + Storage storage3 = localStorageManager.selectStorage(dataFlushEvent); + assertNotEquals(storage1, storage3); + + // Select storage for read event + ShuffleDataReadEvent dataReadEvent1 = new ShuffleDataReadEvent(appId, 1, 1, 1, 0); + assertEquals(storage1, localStorageManager.selectStorage(dataReadEvent1)); + ShuffleDataReadEvent dataReadEvent2 = new ShuffleDataReadEvent(appId, 1, 1, 1, 1); + assertEquals(storage3, localStorageManager.selectStorage(dataReadEvent2)); + // if selecting storage for out-of-range storage id, it will throw FileNotFound exception. + ShuffleDataReadEvent dataReadEvent3 = new ShuffleDataReadEvent(appId, 1, 1, 1, 2); + try { + localStorageManager.selectStorage(dataReadEvent3); + fail(); + } catch (FileNotFoundException exception) { + // ignore + } } @Test @@ -178,12 +197,11 @@ public void testStorageSelection() { // case2: one storage is corrupted, and it will switch to other storage at the first time of writing // event of (appId, shuffleId, startPartition) - LocalStorage localStorage1 = getLatestStorage(storage1); markCorrupted(storage1); Storage storage4 = localStorageManager.selectStorage(dataFlushEvent1); assertNotEquals( - getLatestStorage(storage4).getStoragePath(), - localStorage1.getStoragePath() + storage4.getStoragePath(), + storage1.getStoragePath() ); assertEquals(localStorageManager.selectStorage(dataReadEvent), storage4); @@ -206,14 +224,8 @@ public void testStorageSelection() { assertEquals(storage7, storage8); } - private LocalStorage getLatestStorage(Storage storage) { - List localStorageList = ((ChainableLocalStorage)storage).getChainableStorages(); - return localStorageList.get(localStorageList.size() - 1); - } - private void markCorrupted(Storage storage) { - List localStorageList = ((ChainableLocalStorage)storage).getChainableStorages(); - localStorageList.get(localStorageList.size() - 1).markCorrupted(); + ((LocalStorage)storage).markCorrupted(); } @Test diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java deleted file mode 100644 index c7ec213e0b..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/common/ChainableLocalStorage.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.common; - -import java.util.ArrayList; -import java.util.List; -import javax.annotation.Nonnull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.storage.handler.api.ServerReadHandler; -import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler; -import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest; -import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest; - -public class ChainableLocalStorage extends AbstractStorage { - private static final Logger LOGGER = LoggerFactory.getLogger(ChainableLocalStorage.class); - - private final List chainableStorages; - private int size; - - public ChainableLocalStorage(@Nonnull LocalStorage localStorage) { - this.chainableStorages = new ArrayList<>(); - chainableStorages.add(localStorage); - this.size = 1; - } - - public String getBasePath() { - return chainableStorages.get(size - 1).getBasePath(); - } - - public boolean isCorrupted() { - return chainableStorages.get(size - 1).isCorrupted(); - } - - @Override - ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest request) { - return null; - } - - @Override - public ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request) { - return chainableStorages.get(size - 1).getOrCreateWriteHandler(request); - } - - @Override - public ServerReadHandler getOrCreateReadHandler(CreateShuffleReadHandlerRequest request) { - int index = request.getStorageSeqIndex(); - return chainableStorages.get(index).getOrCreateReadHandler(request); - } - - @Override - protected ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request) { - return null; - } - - @Override - public boolean canWrite() { - return chainableStorages.get(size - 1).canWrite(); - } - - @Override - public boolean lockShuffleShared(String shuffleKey) { - return chainableStorages.get(size - 1).lockShuffleShared(shuffleKey); - } - - @Override - public boolean unlockShuffleShared(String shuffleKey) { - return chainableStorages.get(size - 1).unlockShuffleShared(shuffleKey); - } - - @Override - public void updateWriteMetrics(StorageWriteMetrics metrics) { - chainableStorages.get(size - 1).updateWriteMetrics(metrics); - } - - @Override - public void updateReadMetrics(StorageReadMetrics metrics) { - - } - - @Override - public void createMetadataIfNotExist(String shuffleKey) { - chainableStorages.get(size - 1).createMetadataIfNotExist(shuffleKey); - } - - @Override - public String getStoragePath() { - return chainableStorages.get(size - 1).getStoragePath(); - } - - @Override - public String getStorageHost() { - return chainableStorages.get(size - 1).getStorageHost(); - } - - public void removeTailStorage() { - this.chainableStorages.remove(size - 1); - this.size -= 1; - } - - public List getChainableStorages() { - return chainableStorages; - } - - public void switchTo(LocalStorage newLocalStorage) { - // If it's the used storage, it should be switched as the latest local storage to write. - if (chainableStorages.contains(newLocalStorage)) { - chainableStorages.remove(newLocalStorage); - } - chainableStorages.add(newLocalStorage); - } -} From cd37c7ff913005938c3e17b8cc3700492a22aa68 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 12:59:49 +0800 Subject: [PATCH 3/9] checkstyle fix --- .../uniffle/server/ShuffleDataReadEvent.java | 7 ++++++- .../server/storage/LocalStorageManager.java | 2 +- .../AbstractCacheableStorageSelector.java | 19 ++++++++++++++++++- .../local/ChainableLocalStorageSelector.java | 19 ++++++++++++++++++- .../storage/local/LocalStorageView.java | 19 ++++++++++++++++++- .../server/storage/local/StorageSelector.java | 19 ++++++++++++++++++- 6 files changed, 79 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java index 55d8204522..fdab707840 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataReadEvent.java @@ -27,7 +27,12 @@ public class ShuffleDataReadEvent { private int startPartition; private int storageIndex = 0; - public ShuffleDataReadEvent(String appId, int shuffleId, int partitionId, int startPartitionOfRange, int storageIndex) { + public ShuffleDataReadEvent( + String appId, + int shuffleId, + int partitionId, + int startPartitionOfRange, + int storageIndex) { this.appId = appId; this.shuffleId = shuffleId; this.partitionId = partitionId; diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 4c81fc6580..cd1976679d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -231,4 +231,4 @@ public void checkAndClearLeakedShuffleData(Collection appIds) { public List getStorages() { return localStorages; } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java index 2f828f0f93..dc16ac89fe 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/AbstractCacheableStorageSelector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.uniffle.server.storage.local; import org.apache.uniffle.server.event.PurgeEvent; @@ -7,4 +24,4 @@ public abstract class AbstractCacheableStorageSelector implements StorageSelecto public abstract void removeCache(PurgeEvent event); -} \ No newline at end of file +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java index 7e7b072a24..4a462b8a05 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.uniffle.server.storage.local; import java.util.Iterator; @@ -151,4 +168,4 @@ private String getKey(ShuffleDataReadEvent event) { event.getStartPartition() ); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java b/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java index dfadf0589d..dd11b9fe33 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.uniffle.server.storage.local; import java.util.ArrayList; @@ -29,4 +46,4 @@ public void removeTail() { public LocalStorage get(int index) { return localStorages.get(index); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java index 751bcd2563..a77ba16469 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/StorageSelector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.uniffle.server.storage.local; import org.apache.uniffle.server.ShuffleDataFlushEvent; @@ -9,4 +26,4 @@ public interface StorageSelector { T selectForWriter(ShuffleDataFlushEvent event); T getForReader(ShuffleDataReadEvent event); -} \ No newline at end of file +} From 6ae7f4c6922c5873690c9dcbd8e092a68c1b507b Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 13:39:14 +0800 Subject: [PATCH 4/9] Return null when getting storage for reader --- .../uniffle/server/ShuffleServerGrpcService.java | 4 ++-- .../storage/local/ChainableLocalStorageSelector.java | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index cf82ce4574..df6d8a24c2 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -517,7 +517,7 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request, Storage storage = shuffleServer .getStorageManager() .selectStorage( - new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]) + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], request.getStorageId()) ); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); @@ -584,7 +584,7 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = shuffleServer.getStorageManager() - .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], request.getStorageId())); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java index 4a462b8a05..68e22ba4ef 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.UnionKey; -import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; @@ -126,9 +125,14 @@ private String getKey(ShuffleDataFlushEvent event) { @Override public Storage getForReader(ShuffleDataReadEvent event) { try { - return storageOfPartitions.get(getKey(event)).get(event.getStorageIndex()); + LocalStorageView view = storageOfPartitions.get(getKey(event)); + if (view == null) { + return null; + } + return view.get(event.getStorageIndex()); } catch (IndexOutOfBoundsException exception) { - throw new FileNotFoundException("No such local storage for event: " + event); + LOGGER.error("No such local storage for event: " + event); + return null; } } From d1dace90d803fe4d3fb551e7e9e958c1eee77ef0 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 13:43:40 +0800 Subject: [PATCH 5/9] minor optimize --- .../local/ChainableLocalStorageSelector.java | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java index 68e22ba4ef..fed530405e 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -59,7 +59,12 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { int shuffleId = event.getShuffleId(); int partitionId = event.getStartPartition(); - LocalStorageView view = storageOfPartitions.get(getKey(event)); + String cacheKey = UnionKey.buildKey( + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + ); + LocalStorageView view = storageOfPartitions.get(cacheKey); LocalStorage lastStorage = null; if (view != null) { lastStorage = view.getLatest(); @@ -94,7 +99,7 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { final LocalStorage previousStorage = lastStorage; storageOfPartitions.compute( - getKey(event), + cacheKey, (key, storageView) -> { if (storageView == null) { return new LocalStorageView(selected); @@ -114,18 +119,16 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { return selected; } - private String getKey(ShuffleDataFlushEvent event) { - return UnionKey.buildKey( - event.getAppId(), - event.getShuffleId(), - event.getStartPartition() - ); - } - @Override public Storage getForReader(ShuffleDataReadEvent event) { try { - LocalStorageView view = storageOfPartitions.get(getKey(event)); + LocalStorageView view = storageOfPartitions.get( + UnionKey.buildKey( + event.getAppId(), + event.getShuffleId(), + event.getStartPartition() + ) + ); if (view == null) { return null; } @@ -164,12 +167,4 @@ private void deleteElement(Map map, Function deleteCond } } } - - private String getKey(ShuffleDataReadEvent event) { - return UnionKey.buildKey( - event.getAppId(), - event.getShuffleId(), - event.getStartPartition() - ); - } } From ed0bc0d2665c4563ffd38bbdf261ab0b4dd8d3bb Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 13:55:13 +0800 Subject: [PATCH 6/9] fix null check --- .../test/ShuffleClientWithLocalMultiDiskTest.java | 5 +++++ .../server/storage/LocalStorageManagerTest.java | 11 +++-------- 2 files changed, 8 insertions(+), 8 deletions(-) create mode 100644 integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java new file mode 100644 index 0000000000..49951a346c --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java @@ -0,0 +1,5 @@ +package org.apache.uniffle.test;/** + * @author zhangjunfan + * @date 2022/12/22 + */public class ShuffleClientWithLocalMultiDiskTest { +} diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index 2f9f952f1f..24aaa998c8 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.apache.uniffle.common.ShufflePartitionedBlock; -import org.apache.uniffle.common.exception.FileNotFoundException; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; @@ -40,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -157,14 +157,9 @@ public void testStorageSelectionWhenReachingHighWatermark() { assertEquals(storage1, localStorageManager.selectStorage(dataReadEvent1)); ShuffleDataReadEvent dataReadEvent2 = new ShuffleDataReadEvent(appId, 1, 1, 1, 1); assertEquals(storage3, localStorageManager.selectStorage(dataReadEvent2)); - // if selecting storage for out-of-range storage id, it will throw FileNotFound exception. + // if selecting storage for out-of-range storage id, it will return null ShuffleDataReadEvent dataReadEvent3 = new ShuffleDataReadEvent(appId, 1, 1, 1, 2); - try { - localStorageManager.selectStorage(dataReadEvent3); - fail(); - } catch (FileNotFoundException exception) { - // ignore - } + assertNull(localStorageManager.selectStorage(dataReadEvent3)); } @Test From 689d61b9a33cf65849a5003d164d4d5daf9fb7ce Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 15:16:25 +0800 Subject: [PATCH 7/9] Add tests about multiple disk selection --- .../ShuffleClientWithLocalMultiDiskTest.java | 188 +++++++++++++++++- .../server/ShuffleServerGrpcService.java | 6 +- .../uniffle/server/ShuffleTaskManager.java | 10 +- .../impl/LocalFileClientReadHandler.java | 3 +- .../CreateShuffleReadHandlerRequest.java | 10 - 5 files changed, 196 insertions(+), 21 deletions(-) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java index 49951a346c..e32a52fcc7 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java @@ -1,5 +1,185 @@ -package org.apache.uniffle.test;/** - * @author zhangjunfan - * @date 2022/12/22 - */public class ShuffleClientWithLocalMultiDiskTest { +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.client.TestUtils; +import org.apache.uniffle.client.impl.ShuffleReadClientImpl; +import org.apache.uniffle.client.impl.ShuffleWriteClientImpl; +import org.apache.uniffle.client.response.SendShuffleDataResult; +import org.apache.uniffle.client.util.DefaultIdHelper; +import org.apache.uniffle.common.ClientType; +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.util.StorageType; + +import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This class is to test the {@link org.apache.uniffle.server.storage.local.ChainableLocalStorageSelector} multiple + * disk selection strategy for reader. + */ +public class ShuffleClientWithLocalMultiDiskTest extends ShuffleReadWriteBase { + + private static ShuffleServerInfo shuffleServerInfo; + private ShuffleWriteClientImpl shuffleWriteClientImpl; + private ShuffleReadClientImpl shuffleReadClientImpl; + + @BeforeAll + public static void setupShuffleServers() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000000); + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + System.out.println("base: " + basePath); + shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); + shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 10 * 1024 * 1024); + shuffleServerConf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true); + createShuffleServer(shuffleServerConf); + startServers(); + shuffleServerInfo = + new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT); + } + + @BeforeEach + public void createClient() { + shuffleWriteClientImpl = new ShuffleWriteClientImpl( + ClientType.GRPC.name(), 3, 1000, 1, + 1, 1, 1, true, 1, 1, 10, 10); + } + + @AfterEach + public void closeClient() { + shuffleWriteClientImpl.close(); + } + + @Test + public void testClientRemoteReadFromMultipleDisk() throws Exception { + String appId = "testClientRemoteReadFromMultipleDisk_appId"; + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo, + appId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL + ); + + Map expectedData = Maps.newHashMap(); + Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); + + // First committing, blocks will be written to one disk + List blocks = createShuffleBlockList( + 0, 0, 0, 3, 25, blockIdBitmap, + expectedData, Lists.newArrayList(shuffleServerInfo)); + SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false); + assertEquals(0, result.getFailedBlockIds().size()); + assertEquals(3, result.getSuccessBlockIds().size()); + + boolean commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1); + assertTrue(commitResult); + + // Mark one storage reaching high watermark, it should switch another storage for next writing + ShuffleServer shuffleServer = shuffleServers.get(0); + ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 0); + LocalStorage storage1 = (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + storage1.getMetaData().setSize(20 * 1024 * 1024); + + blocks = createShuffleBlockList( + 0, 0, 0, 3, 25, blockIdBitmap, + expectedData, Lists.newArrayList(shuffleServerInfo)); + result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false); + assertEquals(0, result.getFailedBlockIds().size()); + assertEquals(3, result.getSuccessBlockIds().size()); + commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1); + assertTrue(commitResult); + + readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 1); + LocalStorage storage2 = (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + assertNotEquals(storage1, storage2); + + /** + * String storageType, + * String appId, + * int shuffleId, + * int partitionId, + * int indexReadLimit, + * int partitionNumPerRange, + * int partitionNum, + * int readBufferSize, + * String storageBasePath, + * Roaring64NavigableMap blockIdBitmap, + * Roaring64NavigableMap taskIdBitmap, + * List shuffleServerInfoList, + * Configuration hadoopConf, + * IdHelper idHelper) { + */ + ShuffleReadClientImpl readClient = new ShuffleReadClientImpl( + StorageType.LOCALFILE.name(), + appId, + 0, + 0, + 100, + 1, + 1, + 1000, + "", + blockIdBitmap, + Roaring64NavigableMap.bitmapOf(0), + Lists.newArrayList(shuffleServerInfo), + new Configuration(), + new DefaultIdHelper() + ); + + TestUtils.validateResult(readClient, expectedData); + readClient.checkProcessedBlockIds(); + readClient.close(); + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index df6d8a24c2..d1ab3df4f7 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -576,6 +576,8 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, int partitionId = request.getPartitionId(); int partitionNumPerRange = request.getPartitionNumPerRange(); int partitionNum = request.getPartitionNum(); + int storageId = request.getStorageId(); + StatusCode status = StatusCode.SUCCESS; String msg = "OK"; GetLocalShuffleIndexResponse reply; @@ -584,7 +586,7 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); Storage storage = shuffleServer.getStorageManager() - .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], request.getStorageId())); + .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId)); if (storage != null) { storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId)); } @@ -596,7 +598,7 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request, try { long start = System.currentTimeMillis(); ShuffleIndexResult shuffleIndexResult = shuffleServer.getShuffleTaskManager().getShuffleIndex( - appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, request.getStorageId()); + appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, storageId); long readTime = System.currentTimeMillis() - start; byte[] data = shuffleIndexResult.getIndexData(); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index f3c4e198d8..10c37404f8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -402,9 +402,10 @@ public ShuffleDataResult getShuffleData( request.setPartitionNum(partitionNum); request.setStorageType(storageType); request.setRssBaseConf(conf); - request.setStorageSeqIndex(storageId); int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); - Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + Storage storage = storageManager.selectStorage( + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId) + ); if (storage == null) { throw new FileNotFoundException("No such data stored in current storage manager."); } @@ -429,9 +430,10 @@ public ShuffleIndexResult getShuffleIndex( request.setPartitionNum(partitionNum); request.setStorageType(storageType); request.setRssBaseConf(conf); - request.setStorageSeqIndex(storageId); int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum); - Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0])); + Storage storage = storageManager.selectStorage( + new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId) + ); if (storage == null) { throw new FileNotFoundException("No such data in current storage manager."); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java index 09bba08543..89900bc324 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java @@ -37,7 +37,7 @@ public class LocalFileClientReadHandler extends DataSkippableReadHandler { private final int partitionNumPerRange; private final int partitionNum; private ShuffleServerClient shuffleServerClient; - private int storageIndex = 0; + private int storageIndex; public LocalFileClientReadHandler( String appId, @@ -83,6 +83,7 @@ public LocalFileClientReadHandler( this.shuffleServerClient = shuffleServerClient; this.partitionNumPerRange = partitionNumPerRange; this.partitionNum = partitionNum; + this.storageIndex = 0; } /** diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java index 880b2cd5f8..a7c2cf47a3 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java +++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java @@ -46,8 +46,6 @@ public class CreateShuffleReadHandlerRequest { private ShuffleDataDistributionType distributionType; private Roaring64NavigableMap expectTaskIds; private boolean expectedTaskIdsBitmapFilterEnable; - private int storageSeqIndex; - private IdHelper idHelper; public CreateShuffleReadHandlerRequest() { @@ -196,12 +194,4 @@ public IdHelper getIdHelper() { public void setIdHelper(IdHelper idHelper) { this.idHelper = idHelper; } - - public int getStorageSeqIndex() { - return storageSeqIndex; - } - - public void setStorageSeqIndex(int storageSeqIndex) { - this.storageSeqIndex = storageSeqIndex; - } } From 0bea079e49dfc83b1b7fa0189706e12abd35efa1 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 16:17:18 +0800 Subject: [PATCH 8/9] optimize --- .../local/ChainableLocalStorageSelector.java | 25 ++++++++----------- ...ew.java => ChainableLocalStorageView.java} | 15 ++++++----- 2 files changed, 20 insertions(+), 20 deletions(-) rename server/src/main/java/org/apache/uniffle/server/storage/local/{LocalStorageView.java => ChainableLocalStorageView.java} (76%) diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java index fed530405e..ca9b2b75d4 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageSelector.java @@ -44,12 +44,12 @@ public class ChainableLocalStorageSelector extends AbstractCacheableStorageSelec private static final Logger LOGGER = LoggerFactory.getLogger(ChainableLocalStorageSelector.class); private final List localStorages; - private final Map storageOfPartitions; + private final Map viewOfPartitions; private final boolean multipleDiskSelectionEnable; public ChainableLocalStorageSelector(ShuffleServerConf shuffleServerConf, List localStorages) { this.localStorages = localStorages; - this.storageOfPartitions = Maps.newConcurrentMap(); + this.viewOfPartitions = Maps.newConcurrentMap(); this.multipleDiskSelectionEnable = shuffleServerConf.get(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE); } @@ -64,10 +64,10 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { event.getShuffleId(), event.getStartPartition() ); - LocalStorageView view = storageOfPartitions.get(cacheKey); + ChainableLocalStorageView view = viewOfPartitions.get(cacheKey); LocalStorage lastStorage = null; if (view != null) { - lastStorage = view.getLatest(); + lastStorage = view.get(); if (lastStorage.isCorrupted()) { if (lastStorage.containsWriteHandler(appId, shuffleId, partitionId)) { LOGGER.error("LocalStorage: {} is corrupted. Switching another storage for event: {}, " @@ -98,20 +98,17 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { ); final LocalStorage previousStorage = lastStorage; - storageOfPartitions.compute( + viewOfPartitions.compute( cacheKey, (key, storageView) -> { if (storageView == null) { - return new LocalStorageView(selected); + return new ChainableLocalStorageView(selected); } - // If the storage is corrupted, it should be removed from the stoarge view. + // If the storage is corrupted, it should be removed from the storage view. if (previousStorage != null && previousStorage.isCorrupted()) { - LocalStorage currentTailStorage = storageView.getLatest(); - if (previousStorage == currentTailStorage) { - storageView.removeTail(); - } + storageView.remove(previousStorage); } - storageView.add(selected); + storageView.switchTo(selected); return storageView; } ); @@ -122,7 +119,7 @@ public Storage selectForWriter(ShuffleDataFlushEvent event) { @Override public Storage getForReader(ShuffleDataReadEvent event) { try { - LocalStorageView view = storageOfPartitions.get( + ChainableLocalStorageView view = viewOfPartitions.get( UnionKey.buildKey( event.getAppId(), event.getShuffleId(), @@ -153,7 +150,7 @@ public void removeCache(PurgeEvent event) { ); } deleteElement( - storageOfPartitions, + viewOfPartitions, deleteConditionFunc ); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java similarity index 76% rename from server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java rename to server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java index dd11b9fe33..f2852dcf0d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/LocalStorageView.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java @@ -22,25 +22,28 @@ import org.apache.uniffle.storage.common.LocalStorage; -public class LocalStorageView { +/** + * This class is to wrap multiple local storages into single view. + */ +public class ChainableLocalStorageView { private final List localStorages; - public LocalStorageView(LocalStorage localStorage) { + public ChainableLocalStorageView(LocalStorage localStorage) { this.localStorages = new ArrayList<>(); localStorages.add(localStorage); } - public void add(LocalStorage localStorage) { + public synchronized void switchTo(LocalStorage localStorage) { localStorages.add(localStorage); } - public LocalStorage getLatest() { + public synchronized LocalStorage get() { return localStorages.get(localStorages.size() - 1); } - public void removeTail() { - localStorages.remove(localStorages.size() - 1); + public synchronized void remove(LocalStorage localStorage) { + localStorages.remove(localStorage); } public LocalStorage get(int index) { From 5f7c22d4e326e67758d8ab66d6fa821e41d17b39 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 22 Dec 2022 16:23:14 +0800 Subject: [PATCH 9/9] fix --- .../storage/local/ChainableLocalStorageView.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java index f2852dcf0d..84613199c9 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/local/ChainableLocalStorageView.java @@ -34,19 +34,19 @@ public ChainableLocalStorageView(LocalStorage localStorage) { localStorages.add(localStorage); } - public synchronized void switchTo(LocalStorage localStorage) { + public void switchTo(LocalStorage localStorage) { localStorages.add(localStorage); } - public synchronized LocalStorage get() { + public LocalStorage get() { return localStorages.get(localStorages.size() - 1); } - public synchronized void remove(LocalStorage localStorage) { - localStorages.remove(localStorage); - } - public LocalStorage get(int index) { return localStorages.get(index); } + + public void remove(LocalStorage localStorage) { + localStorages.remove(localStorage); + } }