Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ private static boolean requireRemoteStorage(String storageType) {
return StorageType.MEMORY_HDFS.name().equals(storageType)
|| StorageType.MEMORY_LOCALFILE_HDFS.name().equals(storageType)
|| StorageType.HDFS.name().equals(storageType)
|| StorageType.LOCALFILE_HDFS.name().equals(storageType)
|| StorageType.LOCALFILE_HDFS_2.name().equals(storageType);
|| StorageType.LOCALFILE_HDFS.name().equals(storageType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ protected static ShuffleServerConf getShuffleServerConf() throws Exception {
serverConf.setInteger("rss.jetty.corePool.size", 64);
serverConf.setInteger("rss.rpc.executor.size", 10);
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setString("rss.server.uploader.base.path", "test");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
return serverConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;
Expand All @@ -65,14 +63,11 @@ public static void setupServers() throws Exception {
shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 100.0);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 100);
shuffleServerConf.setBoolean(ShuffleServerConf.UPLOADER_ENABLE, true);
shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 30L);
shuffleServerConf.setString(ShuffleServerConf.UPLOADER_BASE_PATH, REMOTE_STORAGE);
shuffleServerConf.setLong(ShuffleServerConf.UPLOAD_COMBINE_THRESHOLD_MB, 1L);
shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS, 5000L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 60L * 1000L * 60L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 1000L);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS_2.name());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 400L * 1024L * 1024L);
createAndStartServers(shuffleServerConf, coordinatorConf);
Expand All @@ -88,60 +83,6 @@ public void closeClient() {
shuffleServerClient.close();
}

@Test
public void hdfsFaultTolerance() {
try {
String appId = "app_hdfs_fault_tolerance_data";
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(2, Lists.newArrayList(0, 3));
map.put(3, Lists.newArrayList(3));
registerShuffle(appId, map);

Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap blockIdBitmap3 = Roaring64NavigableMap.bitmapOf();

List<ShuffleBlockInfo> blocks1 = createShuffleBlockList(
2, 0, 1,11, 10 * 1024 * 1024, blockIdBitmap1, expectedData);

List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
3, 3, 2,9, 10 * 1024 * 1024, blockIdBitmap2, expectedData);

List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
2, 3, 2,9, 10 * 1024 * 1024, blockIdBitmap3, expectedData);

assertEquals(0, ShuffleStorageUtils.getStorageIndex(2, appId, 2, 0));
assertEquals(0, ShuffleStorageUtils.getStorageIndex(2, appId, 3, 3));
assertEquals(0, ShuffleStorageUtils.getStorageIndex(2, appId, 2, 3));
assertEquals(1, cluster.getDataNodes().size());
cluster.stopDataNode(0);
assertEquals(0, cluster.getDataNodes().size());

sendSinglePartitionToShuffleServer(appId, 2, 0, 1, blocks1);
boolean isException = false;
try {
sendSinglePartitionToShuffleServer(appId, 3, 3,2, blocks2);
} catch (RuntimeException re) {
isException = true;
assertTrue(re.getMessage().contains("Fail to finish"));
}
assertTrue(isException);

cluster.startDataNodes(conf, 1, true, HdfsServerConstants.StartupOption.REGULAR,
null, null, null, false, true);
assertEquals(1, cluster.getDataNodes().size());

sendSinglePartitionToShuffleServer(appId, 2, 3, 2, blocks3);

validateResult(appId, 2, 0, blockIdBitmap1, Roaring64NavigableMap.bitmapOf(1), expectedData);
validateResult(appId, 2, 3, blockIdBitmap3, Roaring64NavigableMap.bitmapOf(2), expectedData);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}

@Test
public void hdfsFallbackTest() throws Exception {
String appId = "fallback_test";
Expand All @@ -157,9 +98,6 @@ public void hdfsFallbackTest() throws Exception {
assertEquals(0, cluster.getDataNodes().size());
sendSinglePartitionToShuffleServer(appId, 0, 0, 0, blocks);
validateResult(appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0), expectedData);
cluster.startDataNodes(conf, 1, true, HdfsServerConstants.StartupOption.REGULAR,
null, null, null, false, true);
assertEquals(1, cluster.getDataNodes().size());
}

private void registerShuffle(String appId, Map<Integer, List<Integer>> registerMap) {
Expand All @@ -172,51 +110,6 @@ private void registerShuffle(String appId, Map<Integer, List<Integer>> registerM
}
}

@Test
public void diskFaultTolerance() {
String appId = "app_disk_fault_tolerance_data";
Map<Long, byte[]> expectedData = Maps.newHashMap();

Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(2, Lists.newArrayList(1, 3));
map.put(3, Lists.newArrayList(1));
registerShuffle(appId, map);

Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap blockIdBitmap3 = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap blockIdBitmap4 = Roaring64NavigableMap.bitmapOf();

List<ShuffleBlockInfo> blocks1 = createShuffleBlockList(
2, 1, 1,11, 10 * 1024 * 1024, blockIdBitmap1, expectedData);

List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
3, 1, 2,9, 10 * 1024 * 1024, blockIdBitmap2, expectedData);

List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
2, 3, 2,9, 10 * 1024 * 1024, blockIdBitmap3, expectedData);

List<ShuffleBlockInfo> blocks4 = createShuffleBlockList(
2, 1, 1, 11, 10 * 1024 * 1024, blockIdBitmap4, expectedData);

assertEquals(1, ShuffleStorageUtils.getStorageIndex(2, appId, 2, 1));
assertEquals(1, ShuffleStorageUtils.getStorageIndex(2, appId, 3, 1));
assertEquals(1, ShuffleStorageUtils.getStorageIndex(2, appId, 2, 3));
assertEquals(1, ShuffleStorageUtils.getStorageIndex(2, appId, 2, 1));
try {
sendSinglePartitionToShuffleServer(appId, 2, 1, 1, blocks1);
sendSinglePartitionToShuffleServer(appId, 3, 1,2, blocks2);
sendSinglePartitionToShuffleServer(appId, 2, 3, 2, blocks3);
sendSinglePartitionToShuffleServer(appId, 2, 1, 1, blocks4);
} catch (Exception e) {
e.printStackTrace();
fail();
}
validateResult(appId, 2, 1, blockIdBitmap1, Roaring64NavigableMap.bitmapOf(1), expectedData);
validateResult(appId, 3, 1, blockIdBitmap2, Roaring64NavigableMap.bitmapOf(2), expectedData);
validateResult(appId, 2, 3, blockIdBitmap3, Roaring64NavigableMap.bitmapOf(2), expectedData);
}

private void sendSinglePartitionToShuffleServer(String appId, int shuffle, int partition,
long taskAttemptId, List<ShuffleBlockInfo> blocks) {
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
Expand All @@ -239,7 +132,7 @@ private void sendSinglePartitionToShuffleServer(String appId, int shuffle, int p

protected void validateResult(String appId, int shuffleId, int partitionId, Roaring64NavigableMap blockBitmap,
Roaring64NavigableMap taskBitmap, Map<Long, byte[]> expectedData) {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE_HDFS_2.name(),
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE_HDFS.name(),
appId, shuffleId, partitionId, 100, 1, 10, 1000, REMOTE_STORAGE, blockBitmap, taskBitmap,
Lists.newArrayList(new ShuffleServerInfo("test", LOCALHOST, SHUFFLE_SERVER_PORT)), conf, new DefaultIdHelper());
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
Expand All @@ -256,7 +149,6 @@ protected void validateResult(String appId, int shuffleId, int partitionId, Roar
assertTrue(blockBitmap.equals(matched));
}


private Set<Long> getExpectBlockIds(List<ShuffleBlockInfo> blocks) {
List<Long> expectBlockIds = Lists.newArrayList();
blocks.forEach(b -> expectBlockIds.add(b.getBlockId()));
Expand Down
Loading