Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
Expand Down Expand Up @@ -220,10 +221,14 @@ public Thread newThread(Runnable r) {
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges.entrySet().forEach(entry -> {
client.registerShuffle(
entry.getKey(), appId, 0, entry.getValue(), remoteStorage);
});
serverToPartitionRanges.entrySet().forEach(entry -> client.registerShuffle(
entry.getKey(),
appId,
0,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL
));
LOG.info("Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
return shuffleAssignments;
}, retryInterval, retryTimes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
Expand Down Expand Up @@ -290,7 +291,8 @@ public void registerShuffle(
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage) {
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType distributionType) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.compression.Lz4Codec;
Expand Down Expand Up @@ -386,7 +387,8 @@ public void registerShuffle(
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo storageType) {
RemoteStorageInfo storageType,
ShuffleDataDistributionType distributionType) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
Expand Down Expand Up @@ -279,7 +280,13 @@ protected void registerShuffleServers(
.stream()
.forEach(entry -> {
shuffleWriteClient.registerShuffle(
entry.getKey(), appId, shuffleId, entry.getValue(), remoteStorage);
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL
);
});
LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class RssShuffleManager implements ShuffleManager {
private ScheduledExecutorService heartBeatScheduledExecutorService;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private final ShuffleDataDistributionType dataDistributionType;
private final EventLoop eventLoop;
private final EventLoop defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {

Expand Down Expand Up @@ -155,6 +158,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
final int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
Expand Down Expand Up @@ -205,6 +209,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
Map<String, Set<Long>> taskToFailedBlockIds) {
this.sparkConf = conf;
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dataDistributionType = RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
this.heartbeatTimeout = sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
Expand Down Expand Up @@ -460,7 +465,9 @@ public <K, C> ShuffleReader<K, C> getReaderImpl(
RssUtils.generatePartitionToBitmap(blockIdBitmap, startPartition, endPartition),
taskIdBitmap,
readMetrics,
RssSparkConfig.toRssConf(sparkConf));
RssSparkConfig.toRssConf(sparkConf),
dataDistributionType
);
}

private Roaring64NavigableMap getExpectedTasksByExecutorId(
Expand Down Expand Up @@ -621,7 +628,9 @@ protected void registerShuffleServers(
appId,
shuffleId,
entry.getValue(),
remoteStorage);
remoteStorage,
dataDistributionType
);
});
LOG.info("Finish register shuffleId[" + shuffleId + "] with " + (System.currentTimeMillis() - start) + " ms");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;

Expand Down Expand Up @@ -76,6 +77,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
private int mapEndIndex;
private ShuffleReadMetrics readMetrics;
private RssConf rssConf;
private ShuffleDataDistributionType dataDistributionType;

public RssShuffleReader(
int startPartition,
Expand All @@ -93,7 +95,8 @@ public RssShuffleReader(
Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks,
Roaring64NavigableMap taskIdBitmap,
ShuffleReadMetrics readMetrics,
RssConf rssConf) {
RssConf rssConf,
ShuffleDataDistributionType dataDistributionType) {
this.appId = rssShuffleHandle.getAppId();
this.startPartition = startPartition;
this.endPartition = endPartition;
Expand All @@ -115,6 +118,7 @@ public RssShuffleReader(
this.readMetrics = readMetrics;
this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers();
this.rssConf = rssConf;
this.dataDistributionType = dataDistributionType;
}

@Override
Expand Down Expand Up @@ -201,7 +205,8 @@ class MultiPartitionIterator<K, C> extends AbstractIterator<Product2<K, C>> {
List<ShuffleServerInfo> shuffleServerInfoList = partitionToShuffleServers.get(partition);
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, shuffleId, partition, storageType, basePath, indexReadLimit, readBufferSize,
1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList, hadoopConf);
1, partitionNum, partitionToExpectBlocks.get(partition), taskIdBitmap, shuffleServerInfoList,
hadoopConf, dataDistributionType);
ShuffleReadClient shuffleReadClient = ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import scala.Option;

import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;
Expand Down Expand Up @@ -94,7 +95,10 @@ public void readTest() throws Exception {
1,
partitionToExpectBlocks,
taskIdBitmap,
new ShuffleReadMetrics(), new RssConf()));
new ShuffleReadMetrics(),
new RssConf(),
ShuffleDataDistributionType.NORMAL
));
validateResult(rssShuffleReaderSpy.read(), expectedData, 10);

writeTestData(writeHandler1, 2, 4, expectedData,
Expand All @@ -115,8 +119,10 @@ public void readTest() throws Exception {
2,
partitionToExpectBlocks,
taskIdBitmap,
new ShuffleReadMetrics(), new RssConf())
);
new ShuffleReadMetrics(),
new RssConf(),
ShuffleDataDistributionType.NORMAL
));
validateResult(rssShuffleReaderSpy1.read(), expectedData, 18);

RssShuffleReader rssShuffleReaderSpy2 = spy(new RssShuffleReader<String, String>(
Expand All @@ -134,7 +140,10 @@ public void readTest() throws Exception {
2,
partitionToExpectBlocks,
Roaring64NavigableMap.bitmapOf(),
new ShuffleReadMetrics(), new RssConf()));
new ShuffleReadMetrics(),
new RssConf(),
ShuffleDataDistributionType.NORMAL
));
validateResult(rssShuffleReaderSpy2.read(), Maps.newHashMap(), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;

public interface ShuffleWriteClient {
Expand All @@ -41,7 +42,8 @@ void registerShuffle(
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage);
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType);

boolean sendCommit(Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ public ShuffleReadClient createShuffleReadClient(CreateShuffleReadClientRequest
request.getPartitionId(), request.getIndexReadLimit(), request.getPartitionNumPerRange(),
request.getPartitionNum(), request.getReadBufferSize(), request.getBasePath(),
request.getBlockIdBitmap(), request.getTaskIdBitmap(), request.getShuffleServerInfoList(),
request.getHadoopConf(), request.getIdHelper());
request.getHadoopConf(), request.getIdHelper(), request.getShuffleDataDistributionType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.IdHelper;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
Expand All @@ -60,6 +61,29 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private AtomicLong crcCheckTime = new AtomicLong(0);
private ClientReadHandler clientReadHandler;
private final IdHelper idHelper;
private ShuffleDataDistributionType dataDistributionType = ShuffleDataDistributionType.NORMAL;

public ShuffleReadClientImpl(
String storageType,
String appId,
int shuffleId,
int partitionId,
int indexReadLimit,
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
String storageBasePath,
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType) {
this(storageType, appId, shuffleId, partitionId, indexReadLimit,
partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf, idHelper);
this.dataDistributionType = dataDistributionType;
}

public ShuffleReadClientImpl(
String storageType,
Expand Down Expand Up @@ -96,6 +120,8 @@ public ShuffleReadClientImpl(
request.setHadoopConf(hadoopConf);
request.setExpectBlockIds(blockIdBitmap);
request.setProcessBlockIds(processedBlockIds);
request.setDistributionType(dataDistributionType);
request.setExpectTaskIds(taskIdBitmap);

List<Long> removeBlockIds = Lists.newArrayList();
blockIdBitmap.forEach(bid -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
Expand Down Expand Up @@ -332,7 +333,8 @@ public void registerShuffle(
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage) {
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -342,7 +344,7 @@ public void registerShuffle(
LOG.info("User: {}", user);

RssRegisterShuffleRequest request =
new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user);
new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user, dataDistributionType);
RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

String msg = "Error happened when registerShuffle with appId[" + appId + "], shuffleId[" + shuffleId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.client.util.IdHelper;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;

public class CreateShuffleReadClientRequest {
Expand All @@ -42,6 +43,28 @@ public class CreateShuffleReadClientRequest {
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType = ShuffleDataDistributionType.NORMAL;

public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
int partitionId,
String storageType,
String basePath,
int indexReadLimit,
int readBufferSize,
int partitionNumPerRange,
int partitionNum,
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
ShuffleDataDistributionType dataDistributionType) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, shuffleServerInfoList,
hadoopConf, new DefaultIdHelper());
this.shuffleDataDistributionType = dataDistributionType;
}

public CreateShuffleReadClientRequest(
String appId,
Expand Down Expand Up @@ -148,4 +171,8 @@ public Configuration getHadoopConf() {
public IdHelper getIdHelper() {
return idHelper;
}

public ShuffleDataDistributionType getShuffleDataDistributionType() {
return shuffleDataDistributionType;
}
}
Loading