Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ public class RssMRConfig {
public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE = RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE;

public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS;

public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
public static final int RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE;

public static final String RSS_CONF_FILE = "rss_conf.xml";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,23 @@ public static void main(String[] args) {
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);

int requiredAssignmentShuffleServersNum = conf.getInt(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
);

ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
String appId = applicationAttemptId.toString();
ShuffleAssignmentsInfo response = client.getShuffleAssignments(
appId, 0, numReduceTasks, 1, Sets.newHashSet(assignmentTags));

ShuffleAssignmentsInfo response =
client.getShuffleAssignments(
appId,
0,
numReduceTasks,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum
);

Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges = response.getServerToPartitionRanges();
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionT
}

@Override
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags) {
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionT
}

@Override
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags) {
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ public class RssSparkConfig {
+ "whether this conf is set or not"))
.createWithDefault("");

public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);

public static final ConfigEntry<String> RSS_COORDINATOR_QUORUM = createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM)
.doc("Coordinator quorum"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,11 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
// get all register info according to coordinator's response
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);

int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);

ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
appId, shuffleId, dependency.partitioner().numPartitions(),
partitionNumPerRange, assignmentTags);
partitionNumPerRange, assignmentTags, requiredShuffleServerNumber);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = response.getPartitionToServers();

startHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,15 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<

Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);

int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);

ShuffleAssignmentsInfo response = shuffleWriteClient.getShuffleAssignments(
id.get(),
shuffleId,
dependency.partitioner().numPartitions(),
1,
assignmentTags);
assignmentTags,
requiredShuffleServerNumber);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = response.getPartitionToServers();

startHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void reportShuffleResult(
int bitmapNum);

ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum,
int partitionNumPerRange, Set<String> requiredTags);
int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber);

Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId, int shuffleId, int partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ public RemoteStorageInfo fetchRemoteStorage(String appId) {

@Override
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum,
int partitionNumPerRange, Set<String> requiredTags) {
int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
RssGetShuffleAssignmentsRequest request = new RssGetShuffleAssignmentsRequest(
appId, shuffleId, partitionNum, partitionNumPerRange, replica, requiredTags);
appId, shuffleId, partitionNum, partitionNumPerRange, replica, requiredTags, assignmentShuffleServerNumber);

RssGetShuffleAssignmentsResponse response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.INTERNAL_ERROR);
for (CoordinatorClient coordinatorClient : coordinatorClients) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ public class RssClientConfig {
public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE = 10000;
public static final String RSS_DYNAMIC_CLIENT_CONF_ENABLED = "rss.dynamicClientConf.enabled";
public static final boolean RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE = true;

public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
"rss.client.assignment.shuffle.nodes.max";
public static final int RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE = -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
public interface AssignmentStrategy {

PartitionRangeAssignment assign(int totalPartitionNum, int partitionNumPerRange,
int replica, Set<String> requiredTags);
int replica, Set<String> requiredTags, int requiredShuffleServerNumber);

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ public BasicAssignmentStrategy(ClusterManager clusterManager) {

@Override
public PartitionRangeAssignment assign(int totalPartitionNum, int partitionNumPerRange,
int replica, Set<String> requiredTags) {
int replica, Set<String> requiredTags, int requiredShuffleServerNumber) {
List<PartitionRange> ranges = CoordinatorUtils.generateRanges(totalPartitionNum, partitionNumPerRange);
int shuffleNodesMax = clusterManager.getShuffleNodesMax();
List<ServerNode> servers = getRequiredServers(requiredTags, shuffleNodesMax);
int expectedShuffleNodesNum = shuffleNodesMax;
if (requiredShuffleServerNumber < shuffleNodesMax && requiredShuffleServerNumber > 0) {
expectedShuffleNodesNum = requiredShuffleServerNumber;
}
List<ServerNode> servers = getRequiredServers(requiredTags, expectedShuffleNodesNum);
if (servers.isEmpty() || servers.size() < replica) {
return new PartitionRangeAssignment(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class CoordinatorConf extends RssBaseConf {
.key("rss.coordinator.shuffle.nodes.max")
.intType()
.defaultValue(9)
.withDescription("The max number of shuffle server when do the assignment");
.withDescription("The max limitation number of shuffle server when do the assignment");
public static final ConfigOption<List<String>> COORDINATOR_ACCESS_CHECKERS = ConfigOptions
.key("rss.coordinator.access.checkers")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,21 @@ public void getShuffleAssignments(
final int partitionNumPerRange = request.getPartitionNumPerRange();
final int replica = request.getDataReplica();
final Set<String> requiredTags = Sets.newHashSet(request.getRequireTagsList());
final int requiredShuffleServerNumber = request.getAssignmentShuffleServerNumber();

LOG.info("Request of getShuffleAssignments for appId[" + appId
+ "], shuffleId[" + shuffleId + "], partitionNum[" + partitionNum
+ "], partitionNumPerRange[" + partitionNumPerRange + "], replica[" + replica + "]");
+ "], partitionNumPerRange[" + partitionNumPerRange + "], replica[" + replica
+ "], requiredTags[" + requiredTags
+ "], requiredShuffleServerNumber[" + requiredShuffleServerNumber + "]"
);

GetShuffleAssignmentsResponse response;
try {
final PartitionRangeAssignment pra =
coordinatorServer.getAssignmentStrategy().assign(partitionNum, partitionNumPerRange, replica, requiredTags);
coordinatorServer
.getAssignmentStrategy()
.assign(partitionNum, partitionNumPerRange, replica, requiredTags, requiredShuffleServerNumber);
response =
CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
logAssignmentResult(appId, shuffleId, pra);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public PartitionRangeAssignment assign(
int totalPartitionNum,
int partitionNumPerRange,
int replica,
Set<String> requiredTags) {
Set<String> requiredTags,
int requiredShuffleServerNumber) {

if (partitionNumPerRange != 1) {
throw new RuntimeException("PartitionNumPerRange must be one");
Expand Down Expand Up @@ -107,8 +108,13 @@ public int compare(ServerNode o1, ServerNode o2) {
throw new RuntimeException("There isn't enough shuffle servers");
}

int expectNum = clusterManager.getShuffleNodesMax();
if (nodes.size() < clusterManager.getShuffleNodesMax()) {
final int assignmentMaxNum = clusterManager.getShuffleNodesMax();
int expectNum = assignmentMaxNum;
if (requiredShuffleServerNumber < assignmentMaxNum && requiredShuffleServerNumber > 0) {
expectNum = requiredShuffleServerNumber;
}

if (nodes.size() < expectNum) {
LOG.warn("Can't get expected servers [" + expectNum + "] and found only [" + nodes.size() + "]");
expectNum = nodes.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.uniffle.common.PartitionRange;

import java.io.IOException;
Expand Down Expand Up @@ -64,7 +66,7 @@ public void testAssign() {
20 - i, 0, tags, true));
}

PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags);
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1);
SortedMap<PartitionRange, List<ServerNode>> assignments = pra.getAssignments();
assertEquals(10, assignments.size());

Expand All @@ -90,14 +92,14 @@ public void testRandomAssign() {
clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0,
0, 0, tags, true));
}
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags);
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1);
SortedMap<PartitionRange, List<ServerNode>> assignments = pra.getAssignments();
Set<ServerNode> serverNodes1 = Sets.newHashSet();
for (Map.Entry<PartitionRange, List<ServerNode>> assignment : assignments.entrySet()) {
serverNodes1.addAll(assignment.getValue());
}

pra = strategy.assign(100, 10, 2, tags);
pra = strategy.assign(100, 10, 2, tags, -1);
assignments = pra.getAssignments();
Set<ServerNode> serverNodes2 = Sets.newHashSet();
for (Map.Entry<PartitionRange, List<ServerNode>> assignment : assignments.entrySet()) {
Expand All @@ -118,13 +120,13 @@ public void testAssignWithDifferentNodeNum() {
0, 0, tags, true);

clusterManager.add(sn1);
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags);
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1);
// nodeNum < replica
assertNull(pra.getAssignments());

// nodeNum = replica
clusterManager.add(sn2);
pra = strategy.assign(100, 10, 2, tags);
pra = strategy.assign(100, 10, 2, tags, -1);
SortedMap<PartitionRange, List<ServerNode>> assignments = pra.getAssignments();
Set<ServerNode> serverNodes = Sets.newHashSet();
for (Map.Entry<PartitionRange, List<ServerNode>> assignment : assignments.entrySet()) {
Expand All @@ -136,7 +138,7 @@ public void testAssignWithDifferentNodeNum() {

// nodeNum > replica & nodeNum < shuffleNodesMax
clusterManager.add(sn3);
pra = strategy.assign(100, 10, 2, tags);
pra = strategy.assign(100, 10, 2, tags, -1);
assignments = pra.getAssignments();
serverNodes = Sets.newHashSet();
for (Map.Entry<PartitionRange, List<ServerNode>> assignment : assignments.entrySet()) {
Expand All @@ -147,4 +149,94 @@ public void testAssignWithDifferentNodeNum() {
assertTrue(serverNodes.contains(sn2));
assertTrue(serverNodes.contains(sn3));
}

@Test
public void testAssignmentShuffleNodesNum() {
Set<String> serverTags = Sets.newHashSet("tag-1");

for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0,
20 - i, 0, serverTags, true));
}

/**
* case1: user specify the illegal shuffle node num(<0)
* it will use the default shuffle nodes num when having enough servers.
*/
PartitionRangeAssignment pra = strategy.assign(100, 10, 1, serverTags, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a case when assignShuffleServer is equal to 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update

assertEquals(
shuffleNodesMax,
pra.getAssignments()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet())
.size()
);

/**
* case2: user specify the illegal shuffle node num(==0)
* it will use the default shuffle nodes num when having enough servers.
*/
pra = strategy.assign(100, 10, 1, serverTags, 0);
assertEquals(
shuffleNodesMax,
pra.getAssignments()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet())
.size()
);

/**
* case3: user specify the illegal shuffle node num(>default max limitation)
* it will use the default shuffle nodes num when having enough servers
*/
pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax + 10);
assertEquals(
shuffleNodesMax,
pra.getAssignments()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet())
.size()
);

/**
* case4: user specify the legal shuffle node num,
* it will use the customized shuffle nodes num when having enough servers
*/
pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax - 1);
assertEquals(
shuffleNodesMax - 1,
pra.getAssignments()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet())
.size()
);

/**
* case5: user specify the legal shuffle node num, but cluster dont have enough servers,
* it will return the remaining servers.
*/
serverTags = Sets.newHashSet("tag-2");
for (int i = 0; i < shuffleNodesMax - 1; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0,
20 - i, 0, serverTags, true));
}
pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax);
assertEquals(
shuffleNodesMax - 1,
pra.getAssignments()
.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet())
.size()
);
}
}
Loading