diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 0af62d4204..dd78541aaf 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -352,8 +352,6 @@ public class ShuffleServerConf extends RssBaseConf { public static final ConfigOption FALLBACK_MAX_FAIL_TIMES = ConfigOptions.key("rss.server.hybrid.storage.fallback.max.fail.times") .longType() - .checkValue( - ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, " fallback times must be non-negative") .defaultValue(0L) .withDescription("For hybrid storage, fail times exceed the number, will switch storage") .withDeprecatedKeys("rss.server.multistorage.fallback.max.fail.times"); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java index 48a0858ee9..e569578db3 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java @@ -37,7 +37,7 @@ public HadoopStorageManagerFallbackStrategy(ShuffleServerConf conf) { @Override public StorageManager tryFallback( StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) { - if (event.getRetryTimes() > fallBackTimes) { + if (event.getRetryTimes() > fallBackTimes || !current.canWrite(event)) { return findNextStorageManager(current, excludeTypes, event, candidates); } return current; diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java index d506fe97d1..565fa2b3ce 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java @@ -37,7 +37,7 @@ public LocalStorageManagerFallbackStrategy(ShuffleServerConf conf) { @Override public StorageManager tryFallback( StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) { - if (event.getRetryTimes() > fallBackTimes) { + if (event.getRetryTimes() > fallBackTimes || !current.canWrite(event)) { return findNextStorageManager(current, excludeTypes, event, candidates); } return current; diff --git a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java index 0f61643d96..623438f206 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java @@ -32,7 +32,8 @@ public RotateStorageManagerFallbackStrategy(ShuffleServerConf conf) { public StorageManager tryFallback( StorageManager current, ShuffleDataFlushEvent event, StorageManager... candidates) { if (fallBackTimes > 0 - && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() % fallBackTimes > 0)) { + && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() % fallBackTimes > 0) + && current.canWrite(event)) { return current; } return findNextStorageManager(current, null, event, candidates); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java index ec767bc634..f2b0b3048d 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java @@ -697,16 +697,15 @@ public void fallbackWrittenWhenHybridStorageManagerEnableTest(@TempDir File temp List blocks = Lists.newArrayList(new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, (byte[]) null)); ShuffleDataFlushEvent bigEvent = - new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null); + new ShuffleDataFlushEvent(1, "1", 2, 1, 1, 100, blocks, null, null); bigEvent.setUnderStorage( ((HybridStorageManager) storageManager).getWarmStorageManager().selectStorage(event)); ((HybridStorageManager) storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0); - event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100); + event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100); flushManager.addToFlushQueue(event); Thread.sleep(1000); assertTrue(event.getUnderStorage() instanceof HadoopStorage); - assertEquals(1, event.getRetryTimes()); } @Test @@ -756,11 +755,11 @@ public void defaultFlushEventHandlerTest(@TempDir File tempDir) throws Exception ((HybridStorageManager) storageManager).getWarmStorageManager().selectStorage(event)); ((HybridStorageManager) storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0); - event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100); + event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100); flushManager.addToFlushQueue(event); - waitForFlush(flushManager, appId, 1, 15); - assertEquals(1, event.getRetryTimes()); - assertEquals(2, ShuffleServerMetrics.counterLocalFileEventFlush.get()); + waitForFlush(flushManager, appId, 2, 5); + assertEquals(0, event.getRetryTimes()); + assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get()); assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get()); } diff --git a/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java index d3e12a284d..7fe6f3bc0e 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java @@ -36,6 +36,54 @@ public class HybridStorageManagerTest { + /** + * this tests the fallback strategy when encountering the local storage is invalid. 1. When + * specifying the fallback max fail time = 0, the event will be discarded 2. When specifying the + * fallback max fail time < 0, the event will be taken by Hadoop Storage. + */ + @Test + public void fallbackTestWhenLocalStorageCorrupted() { + ShuffleServerConf conf = new ShuffleServerConf(); + conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L); + conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test")); + conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L); + conf.setString( + ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); + conf.setString( + ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS, + "org.apache.uniffle.server.storage.hybrid.HugePartitionSensitiveStorageManagerSelector"); + conf.setString( + ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS, + "org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy"); + + // case1: fallback to hadoop storage when fallback_max_fail_time = -1 + conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, -1); + HybridStorageManager manager = new HybridStorageManager(conf); + + LocalStorageManager localStorageManager = (LocalStorageManager) manager.getWarmStorageManager(); + localStorageManager.getStorages().get(0).markCorrupted(); + + String remoteStorage = "test"; + String appId = "selectStorageManagerWithSelectorAndFallbackStrategy_appId"; + manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage)); + List blocks = + Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, (byte[]) null)); + ShuffleDataFlushEvent event = + new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null); + assertTrue((manager.selectStorage(event) instanceof HadoopStorage)); + + // case2: fallback is still valid when fallback_max_fail_time = 0 + conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 0); + manager = new HybridStorageManager(conf); + + localStorageManager = (LocalStorageManager) manager.getWarmStorageManager(); + localStorageManager.getStorages().get(0).markCorrupted(); + + event = new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null); + manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage)); + assertTrue((manager.selectStorage(event) instanceof HadoopStorage)); + } + @Test public void selectStorageManagerTest() { ShuffleServerConf conf = new ShuffleServerConf();