Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public class RssShuffleManager implements ShuffleManager {
private final int dataCommitPoolSize;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private RemoteStorageInfo remoteStorage;
private ThreadPoolExecutor threadPoolExecutor;
private EventLoop eventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {

Expand Down Expand Up @@ -213,9 +212,10 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
}

String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
remoteStorage = new RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
remoteStorage = ClientUtils.fetchRemoteStorage(
appId, remoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);

int partitionNumPerRange = sparkConf.get(RssSparkConfig.RSS_PARTITION_NUM_PER_RANGE);

Expand All @@ -233,7 +233,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
appId, shuffleId, dependency.partitioner().numPartitions(),
partitionNumPerRange, assignmentTags, requiredShuffleServerNumber);
registerShuffleServers(appId, shuffleId, response.getServerToPartitionRanges());
registerShuffleServers(appId, shuffleId, response.getServerToPartitionRanges(), remoteStorage);
return response.getPartitionToServers();
}, retryInterval, retryTimes);
} catch (Throwable throwable) {
Expand Down Expand Up @@ -268,7 +268,8 @@ private void startHeartbeat() {
protected void registerShuffleServers(
String appId,
int shuffleId,
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges) {
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges,
RemoteStorageInfo remoteStorage) {
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return;
}
Expand Down Expand Up @@ -480,8 +481,4 @@ public void setAppId(String appId) {
this.appId = appId;
}

@VisibleForTesting
public void setRemoteStorage(RemoteStorageInfo remoteStorage) {
this.remoteStorage = remoteStorage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public class RssShuffleManager implements ShuffleManager {
private ScheduledExecutorService heartBeatScheduledExecutorService;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
private RemoteStorageInfo remoteStorage;
private final EventLoop eventLoop;
private final EventLoop defaultEventLoop = new EventLoop<AddBlockEvent>("ShuffleDataQueue") {

Expand Down Expand Up @@ -266,10 +265,10 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
LOG.info("Generate application id used in rss: " + id.get());

String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
remoteStorage = new RemoteStorageInfo(
RemoteStorageInfo defaultRemoteStorage = new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
remoteStorage = ClientUtils.fetchRemoteStorage(
id.get(), remoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);

Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);

Expand All @@ -288,7 +287,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<
1,
assignmentTags,
requiredShuffleServerNumber);
registerShuffleServers(id.get(), shuffleId, response.getServerToPartitionRanges());
registerShuffleServers(id.get(), shuffleId, response.getServerToPartitionRanges(), remoteStorage);
return response.getPartitionToServers();
}, retryInterval, retryTimes);
} catch (Throwable throwable) {
Expand Down Expand Up @@ -606,7 +605,8 @@ protected void registerShuffleServers(
String appId,
int shuffleId,
Map<ShuffleServerInfo,
List<PartitionRange>> serverToPartitionRanges) {
List<PartitionRange>> serverToPartitionRanges,
RemoteStorageInfo remoteStorage) {
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return;
}
Expand Down Expand Up @@ -728,11 +728,6 @@ public void setAppId(String appId) {
this.id = new AtomicReference<>(appId);
}

@VisibleForTesting
public void setRemoteStorage(RemoteStorageInfo remoteStorage) {
this.remoteStorage = remoteStorage;
}

public String getId() {
return id.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public void test() throws Exception {
assertNull(commonHadoopConf.get("k2"));

// mock the scenario that get reader in an executor
rssShuffleManager.setRemoteStorage(null);
rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
rssShuffleHandle, 0, 0, mockTaskContextImpl);
hadoopConf = rssShuffleReader.getHadoopConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ public void test() throws Exception {
assertNull(commonHadoopConf.get("k1"));
assertNull(commonHadoopConf.get("k2"));

// mock the scenario that get reader in an executor
rssShuffleManager.setRemoteStorage(null);
rssShuffleReader = (RssShuffleReader) rssShuffleManager.getReader(
rssShuffleHandle, 0, 0, new MockTaskContext(), new TempShuffleReadMetrics());
hadoopConf = rssShuffleReader.getHadoopConf();
Expand Down