Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public GrpcServer getServer() {

public GrpcServer getServer(ShuffleManagerGrpcService service) {
ServerType type = conf.get(RssBaseConf.RPC_SERVER_TYPE);
if (type == ServerType.GRPC) {
if (type == ServerType.GRPC || type == ServerType.GRPC_NETTY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look like fixing flaky tests but like a feature + testing it. Maybe the PR title should be rephrased.

if (service == null) {
service = new ShuffleManagerGrpcService(shuffleManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
Expand Down Expand Up @@ -124,6 +125,7 @@ private RssShuffleDataIterator getDataIterator(
boolean compress) {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@

package org.apache.uniffle.shuffle.manager;

import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.stream.Stream;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;

import static org.junit.jupiter.api.Assertions.assertThrows;

public class ShuffleManagerServerFactoryTest {
@Test
public void testShuffleManagerServerType() {
private static Stream<Arguments> shuffleManagerServerTypeProvider() {
return Arrays.stream(ServerType.values()).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("shuffleManagerServerTypeProvider")
public void testShuffleManagerServerType(ServerType serverType) {
// add code to generate tests that check the server type
RssBaseConf conf = new RssBaseConf();
conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC);
conf.set(RssBaseConf.RPC_SERVER_TYPE, serverType);
ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(null, conf);
// this should execute normally;
factory.getServer();

// other types should raise an exception
conf.set(RssBaseConf.RPC_SERVER_TYPE, ServerType.GRPC_NETTY);
factory = new ShuffleManagerServerFactory(null, conf);
assertThrows(UnsupportedOperationException.class, factory::getServer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
readBufferSize = Integer.MAX_VALUE;
}
boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);

builder.indexReadLimit(indexReadLimit);
builder.storageType(storageType);
builder.readBufferSize(readBufferSize);
builder.offHeapEnable(offHeapEnabled);
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the builder not take the client type from the rss conf? Is there a use case wher rss conf has one client type but builder.getClientType() has a different type?

Copy link
Contributor Author

@rickyma rickyma Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an issue of historical oversight in the code; it's not written in a standard way. Normally speaking, if there are fields in the builder, which are not null, we should take from the builder first. If the builder doesn't have them, then we should take from RssConf. In the current production code, even if the builder has the field(which is not null), we still basically take it from RssConf.

And that is the reason why https://github.com/apache/incubator-uniffle/pull/1663/files#diff-2c63c456bb33c64e05cbb946b316f6b26ac9153fc8ef112024e47bf8c3fc3c5aR46 does not fail when we set rss.client.type to GRPC_NETTY. Because when calling the builder's build() method, the client type will be reset to the default value GRPC through RssConf, so there has actually been a hidden bug here all along.

Of course, it's not impossible to remove those fields from the builder and set them all through RssConf, it's just harder to maintain. I think, since we already have those fields in the builder, we should support setting them through the builder, otherwise it's better to remove them altogether.

}
} else {
// most for test
RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf();
Expand All @@ -131,7 +132,9 @@ public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
builder.rssConf(rssConf);
builder.offHeapEnable(false);
builder.expectedTaskIdsBitmapFilterEnable(false);
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
if (builder.getClientType() == null) {
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
}
}
if (builder.getIdHelper() == null) {
builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId("appId")
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
Expand Down Expand Up @@ -99,11 +97,8 @@ public void createClient(@TempDir File serverTmpDir) throws Exception {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
Expand Down Expand Up @@ -193,6 +188,7 @@ private void diskErrorTest(boolean isNettyMode) throws Exception {
isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList;
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(appId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
Expand All @@ -49,8 +50,6 @@
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
Expand All @@ -63,19 +62,16 @@ public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBa
protected ShuffleServerGrpcNettyClient nettyShuffleServerClient;
protected static ShuffleServerConf grpcShuffleServerConfig;
protected static ShuffleServerConf nettyShuffleServerConfig;
private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";
private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault_%s";

@BeforeEach
public void createClient() throws Exception {
ShuffleServerClientFactory.getInstance().cleanupCache();
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
Expand All @@ -100,7 +96,7 @@ private void fallbackTest(boolean isNettyMode) throws Exception {
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(0, Lists.newArrayList(0));
registerShuffle(appId, map);
registerShuffle(appId, map, isNettyMode);
Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
final List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData);
Expand All @@ -110,16 +106,19 @@ private void fallbackTest(boolean isNettyMode) throws Exception {
appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0), expectedData, isNettyMode);
}

private void registerShuffle(String appId, Map<Integer, List<Integer>> registerMap) {
private void registerShuffle(
String appId, Map<Integer, List<Integer>> registerMap, boolean isNettyMode) {
ShuffleServerClient shuffleServerClient =
isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient;
for (Map.Entry<Integer, List<Integer>> entry : registerMap.entrySet()) {
for (int partition : entry.getValue()) {
RssRegisterShuffleRequest rr =
new RssRegisterShuffleRequest(
appId,
entry.getKey(),
Lists.newArrayList(new PartitionRange(partition, partition)),
REMOTE_STORAGE);
grpcShuffleServerClient.registerShuffle(rr);
String.format(REMOTE_STORAGE, isNettyMode));
shuffleServerClient.registerShuffle(rr);
}
}
}
Expand Down Expand Up @@ -171,6 +170,7 @@ protected void validateResult(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE_HDFS.name())
.appId(appId)
.shuffleId(shuffleId)
Expand All @@ -179,7 +179,7 @@ protected void validateResult(
.partitionNumPerRange(1)
.partitionNum(10)
.readBufferSize(1000)
.basePath(REMOTE_STORAGE)
.basePath(String.format(REMOTE_STORAGE, isNettyMode))
.blockIdBitmap(blockBitmap)
.taskIdBitmap(taskBitmap)
.shuffleServerInfoList(Lists.newArrayList(ssi))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
Expand Down Expand Up @@ -83,11 +80,8 @@ public void createClient() throws Exception {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
Expand Down Expand Up @@ -68,11 +65,8 @@ public void createClient() throws Exception {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class QuorumTest extends ShuffleReadWriteBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.MEMORY_LOCALFILE.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class RpcClientRetryTest extends ShuffleReadWriteBase {

private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(storageType.name())
.shuffleId(0)
.partitionId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void testConcurrentWrite2Hadoop(
});
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.HDFS.name())
.appId(appId)
.shuffleId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
Expand Down Expand Up @@ -88,16 +86,14 @@ public void setupServers(@TempDir File tmpDir) throws Exception {
startServers();
grpcShuffleServerClients = new ArrayList<>();
nettyShuffleServerClients = new ArrayList<>();
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
for (ShuffleServer shuffleServer : grpcShuffleServers) {
grpcShuffleServerClients.add(
new ShuffleServerGrpcClient(shuffleServer.getIp(), shuffleServer.getGrpcPort()));
}
for (ShuffleServer shuffleServer : nettyShuffleServers) {
nettyShuffleServerClients.add(
new ShuffleServerGrpcNettyClient(
rssConf, LOCALHOST, shuffleServer.getGrpcPort(), shuffleServer.getNettyPort()));
LOCALHOST, shuffleServer.getGrpcPort(), shuffleServer.getNettyPort()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,12 @@
import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
Expand Down Expand Up @@ -149,11 +146,8 @@ public void createClient() throws Exception {
grpcShuffleServerClient =
new ShuffleServerGrpcClient(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
Expand Down
Loading