diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java index 982d9f77f7..d139a5319c 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java @@ -43,7 +43,7 @@ public GrpcServer getServer() { public GrpcServer getServer(ShuffleManagerGrpcService service) { ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE); - if (type == ServerType.GRPC) { + if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) { if (service == null) { service = new ShuffleManagerGrpcService(shuffleManager); } diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java index b7a44bf21a..3f6993c826 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java +++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java @@ -44,6 +44,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.impl.ShuffleReadClientImpl; +import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.BlockIdLayout; @@ -124,6 +125,7 @@ private RssShuffleDataIterator getDataIterator( boolean compress) { ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.HDFS.name()) .appId("appId") .shuffleId(0) diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java index e17d4ef006..34a1f24024 100644 --- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactoryTest.java @@ -17,26 +17,29 @@ package org.apache.uniffle.shuffle.manager; -import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.rpc.ServerType; -import static org.junit.jupiter.api.Assertions.assertThrows; - public class ShuffleManagerServerFactoryTest { - @Test - public void testShuffleManagerServerType() { + private static Stream shuffleManagerServerTypeProvider() { + return Arrays.stream(ServerType.values()).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("shuffleManagerServerTypeProvider") + public void testShuffleManagerServerType(ServerType serverType) { // add code to generate tests that check the server type RssBaseConf conf = new RssBaseConf(); - conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC); + conf.set(RssBaseConf.RPC_SERVER_TYPE, serverType); ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf); // this should execute normally; factory.getServer(); - - // other types should raise an exception - conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY); - factory = new ShuffleManagerServerFactory(null, conf); - assertThrows(UnsupportedOperationException.class, factory::getServer); } } diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index 4a789bfa2c..e1aa0f9582 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -102,12 +102,13 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) { readBufferSize = Integer.MAX_VALUE; } boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE); - builder.indexReadLimit(indexReadLimit); builder.storageType(storageType); builder.readBufferSize(readBufferSize); builder.offHeapEnable(offHeapEnabled); - builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE)); + if (builder.getClientType() == null) { + builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE)); + } } else { // most for test RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf(); @@ -131,7 +132,9 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) { builder.rssConf(rssConf); builder.offHeapEnable(false); builder.expectedTaskIdsBitmapFilterEnable(false); - builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE)); + if (builder.getClientType() == null) { + builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE)); + } } if (builder.getIdHelper() == null) { builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf()))); diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java index 4c0679a110..5d0d4b4106 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java @@ -36,6 +36,7 @@ import org.apache.uniffle.client.TestUtils; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.response.CompressedShuffleBlock; +import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssClientConf; @@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase { private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { return ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.HDFS.name()) .appId("appId") .shuffleId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java index 0661351554..4029d1ca6b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java @@ -51,8 +51,6 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -99,11 +97,8 @@ public void createClient(@TempDir File serverTmpDir) throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -193,6 +188,7 @@ private void diskErrorTest(boolean isNettyMode) throws Exception { isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList; ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .appId(appId) .shuffleId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java index eb44a9b69f..8a2fe2635c 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java @@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.apache.uniffle.client.api.ShuffleServerClient; import org.apache.uniffle.client.factory.ShuffleClientFactory; import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.client.impl.ShuffleReadClientImpl; @@ -49,8 +50,6 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; @@ -63,7 +62,7 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa protected ShuffleServerGrpcNettyClient nettyShuffleServerClient; protected static ShuffleServerConf grpcShuffleServerConfig; protected static ShuffleServerConf nettyShuffleServerConfig; - private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault"; + private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault_%s"; @BeforeEach public void createClient() throws Exception { @@ -71,11 +70,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -100,7 +96,7 @@ private void fallbackTest(boolean isNettyMode) throws Exception { Map expectedData = Maps.newHashMap(); Map> map = Maps.newHashMap(); map.put(0, Lists.newArrayList(0)); - registerShuffle(appId, map); + registerShuffle(appId, map, isNettyMode); Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf(); final List blocks = createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData); @@ -110,7 +106,10 @@ private void fallbackTest(boolean isNettyMode) throws Exception { appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0), expectedData, isNettyMode); } - private void registerShuffle(String appId, Map> registerMap) { + private void registerShuffle( + String appId, Map> registerMap, boolean isNettyMode) { + ShuffleServerClient shuffleServerClient = + isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient; for (Map.Entry> entry : registerMap.entrySet()) { for (int partition : entry.getValue()) { RssRegisterShuffleRequest rr = @@ -118,8 +117,8 @@ private void registerShuffle(String appId, Map> registerM appId, entry.getKey(), Lists.newArrayList(new PartitionRange(partition, partition)), - REMOTE_STORAGE); - grpcShuffleServerClient.registerShuffle(rr); + String.format(REMOTE_STORAGE, isNettyMode)); + shuffleServerClient.registerShuffle(rr); } } } @@ -171,6 +170,7 @@ protected void validateResult( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE_HDFS.name()) .appId(appId) .shuffleId(shuffleId) @@ -179,7 +179,7 @@ protected void validateResult( .partitionNumPerRange(1) .partitionNum(10) .readBufferSize(1000) - .basePath(REMOTE_STORAGE) + .basePath(String.format(REMOTE_STORAGE, isNettyMode)) .blockIdBitmap(blockBitmap) .taskIdBitmap(taskBitmap) .shuffleServerInfoList(Lists.newArrayList(ssi)) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java index 7b53fe606a..4ec4fff327 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageHadoopFallbackTest.java @@ -27,9 +27,6 @@ import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; -import org.apache.uniffle.common.ClientType; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; @@ -83,11 +80,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java index 6c63f44eab..15680573bb 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageLocalFileFallbackTest.java @@ -27,9 +27,6 @@ import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; -import org.apache.uniffle.common.ClientType; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; @@ -68,11 +65,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java index da9dd71580..6662a36703 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java @@ -75,6 +75,7 @@ public class QuorumTest extends ShuffleReadWriteBase { private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { return ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.MEMORY_LOCALFILE.name()) .shuffleId(0) .partitionId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java index abefb1b0dd..d082ae0259 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java @@ -63,6 +63,7 @@ public class RpcClientRetryTest extends ShuffleReadWriteBase { private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) { return ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(storageType.name()) .shuffleId(0) .partitionId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java index ec5222919f..7d5d11481b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java @@ -46,6 +46,7 @@ import org.apache.uniffle.client.request.RssSendCommitRequest; import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; +import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; @@ -177,6 +178,7 @@ public void testConcurrentWrite2Hadoop( }); ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.HDFS.name()) .appId(appId) .shuffleId(0) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java index 631cd7fe93..2eee21227d 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java @@ -47,8 +47,6 @@ import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.ByteBufUtils; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -88,8 +86,6 @@ public void setupServers(@TempDir File tmpDir) throws Exception { startServers(); grpcShuffleServerClients = new ArrayList<>(); nettyShuffleServerClients = new ArrayList<>(); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); for (ShuffleServer shuffleServer : grpcShuffleServers) { grpcShuffleServerClients.add( new ShuffleServerGrpcClient(shuffleServer.getIp(), shuffleServer.getGrpcPort())); @@ -97,7 +93,7 @@ public void setupServers(@TempDir File tmpDir) throws Exception { for (ShuffleServer shuffleServer : nettyShuffleServers) { nettyShuffleServerClients.add( new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, shuffleServer.getGrpcPort(), shuffleServer.getNettyPort())); + LOCALHOST, shuffleServer.getGrpcPort(), shuffleServer.getNettyPort())); } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java index 39e3deefbc..9b486629c2 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java @@ -62,15 +62,12 @@ import org.apache.uniffle.client.response.RssRegisterShuffleResponse; import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.config.RssBaseConf; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.metrics.TestUtils; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; @@ -149,11 +146,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java index c899b8eac8..67a9dc05c4 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java @@ -46,8 +46,6 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -93,11 +91,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -109,8 +104,9 @@ public void closeClient() { nettyShuffleServerClient.close(); } - private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { + private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) { return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.HDFS.name()) .shuffleId(0) .partitionId(0) @@ -172,7 +168,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) { LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); ShuffleReadClientImpl readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -208,7 +204,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) { shuffleServerClient.finishShuffle(rfsr); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -218,7 +214,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) { validateResult(readClient, expectedData, bitmaps[0]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(1) .basePath(dataBasePath) @@ -229,7 +225,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) { validateResult(readClient, expectedData, bitmaps[1]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(2) .basePath(dataBasePath) @@ -240,7 +236,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) { validateResult(readClient, expectedData, bitmaps[2]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(3) .basePath(dataBasePath) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java index 036120499a..77330fe2be 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import com.google.common.collect.Lists; @@ -55,8 +54,6 @@ import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.RssUtils; @@ -95,10 +92,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase private static ShuffleServerConf grpcShuffleServerConfig; private static ShuffleServerConf nettyShuffleServerConfig; - private static AtomicInteger serverRpcPortCounter = new AtomicInteger(); - private static AtomicInteger nettyPortCounter = new AtomicInteger(); - private static AtomicInteger jettyPortCounter = new AtomicInteger(); - static @TempDir File tempDir; private static ShuffleServerConf getShuffleServerConf(ServerType serverType) throws Exception { @@ -173,11 +166,8 @@ public void beforeEach() throws Exception { new ShuffleServerGrpcClient( LOCALHOST, getShuffleServerConf(ServerType.GRPC).getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, getShuffleServerConf(ServerType.GRPC_NETTY) .getInteger(ShuffleServerConf.RPC_SERVER_PORT), @@ -225,8 +215,9 @@ private Map> createTestData( return partitionToBlocks; } - private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() { + private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMode) { return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.HDFS.name()) .shuffleId(0) .partitionId(0) @@ -305,7 +296,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception { : new ShuffleServerInfo( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); ShuffleReadClientImpl readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -341,7 +332,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception { shuffleServerClient.finishShuffle(rfsr); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .basePath(dataBasePath) .blockIdBitmap(bitmaps[0]) @@ -351,7 +342,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception { validateResult(readClient, expectedData, bitmaps[0]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(1) .basePath(dataBasePath) @@ -362,7 +353,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception { validateResult(readClient, expectedData, bitmaps[1]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(2) .basePath(dataBasePath) @@ -373,7 +364,7 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception { validateResult(readClient, expectedData, bitmaps[2]); readClient = - baseReadBuilder() + baseReadBuilder(isNettyMode) .appId(appId) .partitionId(3) .basePath(dataBasePath) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java index 3f93da9694..f51bc36499 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java @@ -40,7 +40,6 @@ public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase { private ShuffleServerGrpcClient shuffleServerClient; - private static String REMOTE_STORAGE = HDFS_URI + "rss/test"; private static int rpcPort; diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java index 2803f05e25..b0ed66d7f9 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java @@ -48,13 +48,10 @@ import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.client.util.DefaultIdHelper; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter; @@ -116,11 +113,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java index 29321b3d5f..a77472e6a8 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java @@ -45,12 +45,9 @@ import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ChecksumUtils; @@ -110,11 +107,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java index 549ae21e93..f969397f80 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java @@ -43,13 +43,10 @@ import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ByteBufUtils; @@ -76,7 +73,7 @@ public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { LoggerFactory.getLogger(ShuffleServerWithMemLocalHadoopTest.class); private ShuffleServerGrpcClient grpcShuffleServerClient; private ShuffleServerGrpcNettyClient nettyShuffleServerClient; - private static String REMOTE_STORAGE = HDFS_URI + "rss/test"; + private static String REMOTE_STORAGE = HDFS_URI + "rss/test_%s"; private static ShuffleServerConf grpcShuffleServerConfig; private static ShuffleServerConf nettyShuffleServerConfig; @@ -107,11 +104,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -167,7 +161,10 @@ private void runTest(boolean checkSkippedMetrics, boolean isNettyMode) throws Ex int partitionId = 0; RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest( - testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), REMOTE_STORAGE); + testAppId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + String.format(REMOTE_STORAGE, isNettyMode)); shuffleServerClient.registerShuffle(rrsr); Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf(); Map dataMap = Maps.newHashMap(); @@ -216,7 +213,7 @@ private void runTest(boolean checkSkippedMetrics, boolean isNettyMode) throws Ex 500, expectBlockIds, processBlockIds, - REMOTE_STORAGE, + String.format(REMOTE_STORAGE, isNettyMode), conf); ClientReadHandler[] handlers = new ClientReadHandler[3]; handlers[0] = memoryClientReadHandler; diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java index fb01071f05..a2de8f7611 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java @@ -41,13 +41,10 @@ import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ByteBufUtils; @@ -112,11 +109,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index c905968b3a..de96781c8c 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -357,6 +357,7 @@ public void writeReadTest() throws Exception { ShuffleReadClientImpl readClient = ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .appId(testAppId) .shuffleId(0) @@ -380,6 +381,7 @@ public void writeReadTest() throws Exception { assertTrue(commitResult); readClient = ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .appId(testAppId) .shuffleId(0) diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java index ab1b9de1f7..c5421d113f 100644 --- a/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java +++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/DynamicConfTest.java @@ -52,6 +52,7 @@ public void dynamicConfTest() throws Exception { @Override protected void updateRssConfiguration(Configuration jobConf) { + jobConf.set(RssMRConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name()); jobConf.setInt(LargeSorter.NUM_MAP_TASKS, 1); jobConf.setInt(LargeSorter.MBS_PER_MAP, 256); } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java index 29066247e1..4f30c49974 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java @@ -47,14 +47,25 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); createCoordinatorServer(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); + + ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); File dataDir1 = new File(tmpDir, "data1"); File dataDir2 = new File(tmpDir, "data2"); - String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); - shuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); - shuffleServerConf.setString("rss.storage.basePath", basePath); - createShuffleServer(shuffleServerConf); + String grpcBasePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + grpcShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + grpcShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + grpcShuffleServerConf.setString("rss.storage.basePath", grpcBasePath); + createShuffleServer(grpcShuffleServerConf); + + ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + File dataDir3 = new File(tmpDir, "data3"); + File dataDir4 = new File(tmpDir, "data4"); + String nettyBasePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath(); + nettyShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); + nettyShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + nettyShuffleServerConf.setString("rss.storage.basePath", nettyBasePath); + createShuffleServer(nettyShuffleServerConf); + startServers(); } @@ -76,13 +87,19 @@ public void run() throws Exception { Map resultWithoutRss = runSparkApp(sparkConf, fileName); results.add(resultWithoutRss); - updateSparkConfWithRss(sparkConf); + updateSparkConfWithRssGrpc(sparkConf); updateSparkConfCustomer(sparkConf); for (Codec.Type type : new Codec.Type[] {Codec.Type.NOOP, Codec.Type.ZSTD, Codec.Type.LZ4}) { sparkConf.set("spark." + COMPRESSION_TYPE.key().toLowerCase(), type.name()); Map resultWithRss = runSparkApp(sparkConf, fileName); results.add(resultWithRss); } + updateSparkConfWithRssNetty(sparkConf); + for (Codec.Type type : new Codec.Type[] {Codec.Type.NOOP, Codec.Type.ZSTD, Codec.Type.LZ4}) { + sparkConf.set("spark." + COMPRESSION_TYPE.key().toLowerCase(), type.name()); + Map resultWithRss = runSparkApp(sparkConf, fileName); + results.add(resultWithRss); + } for (int i = 1; i < results.size(); i++) { verifyTestResult(results.get(0), results.get(i)); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java index 5719074081..3444a1ceef 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java @@ -63,7 +63,7 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { public void testMemoryRelease() throws Exception { final String fileName = generateTextFile(10000, 10000); SparkConf sparkConf = createSparkConf(); - updateSparkConfWithRss(sparkConf); + updateSparkConfWithRssGrpc(sparkConf); sparkConf.set("spark.executor.memory", "500m"); sparkConf.set("spark.unsafe.exceptionOnMemoryLeak", "true"); updateRssStorage(sparkConf); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java index 8414cd0b9b..3cf1736c5a 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java @@ -141,16 +141,16 @@ private void doTestRssShuffleManager( BlockIdLayout clientConfLayout, BlockIdLayout dynamicConfLayout, BlockIdLayout expectedLayout, - boolean enableDynamicCLientConf) + boolean enableDynamicClientConf) throws Exception { Map dynamicConf = startServers(dynamicConfLayout); SparkConf conf = createSparkConf(); - updateSparkConfWithRss(conf); + updateSparkConfWithRssGrpc(conf); // enable stage recompute conf.set("spark." + RssClientConfig.RSS_RESUBMIT_STAGE, "true"); // enable dynamic client conf - conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, enableDynamicCLientConf); + conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, enableDynamicClientConf); // configure storage type conf.set("spark." + RssClientConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name()); // restarting the coordinator may cause RssException: There isn't enough shuffle servers diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java index c134ced35b..11e60540ee 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java @@ -47,8 +47,6 @@ import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.BlockId; import org.apache.uniffle.common.util.BlockIdLayout; @@ -111,11 +109,8 @@ public void createClient() throws Exception { grpcShuffleServerClient = new ShuffleServerGrpcClient( LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); @@ -140,6 +135,7 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMo LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); return ShuffleClientFactory.newReadBuilder() + .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) .shuffleId(0) .partitionId(0) diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java index 6d48b901c7..e1095e260e 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java @@ -29,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ClientType; + import static org.junit.jupiter.api.Assertions.assertEquals; public abstract class SparkIntegrationTestBase extends IntegrationTestBase { @@ -54,24 +56,24 @@ public void run() throws Exception { final long durationWithoutRss = System.currentTimeMillis() - start; Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - updateSparkConfWithRss(sparkConf); + updateSparkConfWithRssGrpc(sparkConf); updateSparkConfCustomer(sparkConf); start = System.currentTimeMillis(); - Map resultWithRss = runSparkApp(sparkConf, fileName); - final long durationWithRss = System.currentTimeMillis() - start; + Map resultWithRssGrpc = runSparkApp(sparkConf, fileName); + final long durationWithRssGrpc = System.currentTimeMillis() - start; updateSparkConfWithRssNetty(sparkConf); start = System.currentTimeMillis(); Map resultWithRssNetty = runSparkApp(sparkConf, fileName); final long durationWithRssNetty = System.currentTimeMillis() - start; - verifyTestResult(resultWithoutRss, resultWithRss); + verifyTestResult(resultWithoutRss, resultWithRssGrpc); verifyTestResult(resultWithoutRss, resultWithRssNetty); LOG.info( "Test: durationWithoutRss[" + durationWithoutRss - + "], durationWithRss[" - + durationWithRss + + "], durationWithRssGrpc[" + + durationWithRssGrpc + "]" + "], durationWithRssNetty[" + durationWithRssNetty @@ -90,16 +92,16 @@ protected Map runSparkApp(SparkConf sparkConf, String testFileName) throws Excep spark.close(); } spark = SparkSession.builder().config(sparkConf).getOrCreate(); - Map resultWithRss = runTest(spark, testFileName); + Map result = runTest(spark, testFileName); spark.stop(); - return resultWithRss; + return result; } protected SparkConf createSparkConf() { return new SparkConf().setAppName(this.getClass().getSimpleName()).setMaster("local[4]"); } - public void updateSparkConfWithRss(SparkConf sparkConf) { + public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); sparkConf.set( "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); @@ -118,10 +120,11 @@ public void updateSparkConfWithRss(SparkConf sparkConf) { sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "1m"); sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000"); sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true"); + sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, ClientType.GRPC.name()); } public void updateSparkConfWithRssNetty(SparkConf sparkConf) { - sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, "GRPC_NETTY"); + sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY.name()); } protected void verifyTestResult(Map expected, Map actual) { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java index 3b02caff2a..1b46e6e959 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java @@ -140,8 +140,19 @@ public void updateSparkConfCustomer(SparkConf sparkConf) { } @Override - public void updateSparkConfWithRss(SparkConf sparkConf) { - super.updateSparkConfWithRss(sparkConf); + public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { + super.updateSparkConfWithRssGrpc(sparkConf); + addMultiReplicaConf(sparkConf); + } + + @Override + public void updateSparkConfWithRssNetty(SparkConf sparkConf) { + super.updateSparkConfWithRssNetty(sparkConf); + // Add multi replica conf + addMultiReplicaConf(sparkConf); + } + + private static void addMultiReplicaConf(SparkConf sparkConf) { // Add multi replica conf sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(), String.valueOf(replicateWrite)); sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(), String.valueOf(replicateWrite)); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java index 75b43e7564..462ee8daba 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java @@ -140,8 +140,8 @@ public void updateSparkConfCustomer(SparkConf sparkConf) { } @Override - public void updateSparkConfWithRss(SparkConf sparkConf) { - super.updateSparkConfWithRss(sparkConf); + public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { + super.updateSparkConfWithRssGrpc(sparkConf); // Add multi replica conf sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(), String.valueOf(replicateWrite)); sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(), String.valueOf(replicateWrite)); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index f677b63856..0c6860ad12 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,9 +59,14 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { private int nettyPort; private TransportClientFactory clientFactory; + @VisibleForTesting + public ShuffleServerGrpcNettyClient(String host, int grpcPort, int nettyPort) { + this(new RssConf(), host, grpcPort, nettyPort); + } + public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int grpcPort, int nettyPort) { this( - rssConf, + rssConf == null ? new RssConf() : rssConf, host, grpcPort, nettyPort, diff --git a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java index ab9e8ad025..e4b2559aac 100644 --- a/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java +++ b/server/src/test/java/org/apache/uniffle/server/TopNShuffleDataSizeOfAppCalcTaskTest.java @@ -48,11 +48,8 @@ import org.apache.uniffle.client.request.RssSendShuffleDataRequest; import org.apache.uniffle.client.response.RssRegisterShuffleResponse; import org.apache.uniffle.client.response.RssSendShuffleDataResponse; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; -import org.apache.uniffle.common.config.RssClientConf; -import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.metrics.TestUtils; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; @@ -168,12 +165,9 @@ public static void startServers() throws Exception { private void registerAndRequireBuffer(String appId, int length, boolean isNettyMode) throws Exception { - RssConf rssConf = new RssConf(); - rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); ShuffleServerGrpcClient shuffleServerClient = isNettyMode ? new ShuffleServerGrpcNettyClient( - rssConf, LOCALHOST, nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))