Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,23 @@ public class RssMRConfig {
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES;
public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE =
RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE;


public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED;
public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE;

public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR;

public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE
= RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE;

public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER;
public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;

public static final String RSS_CONF_FILE = "rss_conf.xml";

public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,40 @@ public static long getTaskAttemptId(long blockId) {
& MAX_ATTEMPT_ID;
return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH)) + mapId;
}

public static int estimateTaskConcurrency(JobConf jobConf) {
double dynamicFactor = jobConf.getDouble(RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR,
RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);
double slowStart = jobConf.getDouble(Constants.MR_SLOW_START, Constants.MR_SLOW_START_DEFAULT_VALUE);
int mapNum = jobConf.getNumMapTasks();
int reduceNum = jobConf.getNumReduceTasks();
int mapLimit = jobConf.getInt(Constants.MR_MAP_LIMIT, Constants.MR_MAP_LIMIT_DEFAULT_VALUE);
int reduceLimit = jobConf.getInt(Constants.MR_REDUCE_LIMIT, Constants.MR_REDUCE_LIMIT_DEFAULT_VALUE);

int estimateMapNum = mapLimit > 0 ? Math.min(mapNum, mapLimit) : mapNum;
int estimateReduceNum = reduceLimit > 0 ? Math.min(reduceNum, reduceLimit) : reduceNum;
if (slowStart == 1) {
return (int) (Math.max(estimateMapNum, estimateReduceNum) * dynamicFactor);
} else {
return (int) (((1 - slowStart) * estimateMapNum + estimateReduceNum) * dynamicFactor);
}
}

public static int getRequiredShuffleServerNumber(JobConf jobConf) {
int requiredShuffleServerNumber = jobConf.getInt(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
);
boolean enabledEstimateServer = jobConf.getBoolean(
RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED,
RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE
);
if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
return requiredShuffleServerNumber;
}
int taskConcurrency = estimateTaskConcurrency(jobConf);
int taskConcurrencyPerServer = jobConf.getInt(RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER,
RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,7 @@ public Thread newThread(Runnable r) {
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
}

int requiredAssignmentShuffleServersNum = conf.getInt(
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE
);

int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,46 @@ public void applyDynamicClientConfTest() {
assertEquals(Integer.toString(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE),
conf.get(RssMRConfig.RSS_CLIENT_RETRY_MAX));
}

@Test
public void testEstimateTaskConcurrency() {
JobConf jobConf = new JobConf();
jobConf.setInt("mapreduce.job.maps", 500);
jobConf.setInt("mapreduce.job.reduces", 20);
assertEquals(495, RssMRUtils.estimateTaskConcurrency(jobConf));

jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
assertEquals(500, RssMRUtils.estimateTaskConcurrency(jobConf));
jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
assertEquals(200, RssMRUtils.estimateTaskConcurrency(jobConf));

jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
assertEquals(100, RssMRUtils.estimateTaskConcurrency(jobConf));
}

@Test
public void testGetRequiredShuffleServerNumber() {
JobConf jobConf = new JobConf();
jobConf.setInt("mapreduce.job.maps", 500);
jobConf.setInt("mapreduce.job.reduces", 20);
jobConf.setInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, 10);
assertEquals(10, RssMRUtils.getRequiredShuffleServerNumber(jobConf));

jobConf.setBoolean(RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true);
assertEquals(10, RssMRUtils.getRequiredShuffleServerNumber(jobConf));

jobConf.unset(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
assertEquals(7, RssMRUtils.getRequiredShuffleServerNumber(jobConf));

jobConf.setDouble(Constants.MR_SLOW_START, 1.0);
assertEquals(7, RssMRUtils.getRequiredShuffleServerNumber(jobConf));

jobConf.setInt(Constants.MR_MAP_LIMIT, 200);
jobConf.setInt(Constants.MR_REDUCE_LIMIT, 200);
assertEquals(3, RssMRUtils.getRequiredShuffleServerNumber(jobConf));

jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor", 0.5);
assertEquals(2, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,17 @@ public class RssSparkConfig {
+ " to be allocated"))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE);

public static final ConfigEntry<Boolean> RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED = createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED)
.doc("When the Coordinator enables rss.coordinator.select.partition.strategy,"
+ " this configuration item is valid and is used to estimate how many consecutive"
+ " PartitionRanges should be allocated to a ShuffleServer"))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DEFAULT_VALUE);
public static final ConfigEntry<Boolean> RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED)
.doc("Whether to estimate the number of ShuffleServers to be allocated based on the number"
+ " of concurrent tasks."))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE);

public static final ConfigEntry<Integer> RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER)
.doc("How many tasks concurrency to allocate a ShuffleServer, you need to enable"
+ " spark.rss.estimate.server.assignment.enabled"))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);

public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
ImmutableSet.of(RSS_STORAGE_TYPE.key(), RSS_REMOTE_STORAGE_PATH.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,15 @@ public static int estimateTaskConcurrency(SparkConf sparkConf) {
}
return taskConcurrency;
}

public static int getRequiredShuffleServerNumber(SparkConf sparkConf) {
boolean enabledEstimateServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED);
int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
if (!enabledEstimateServer || requiredShuffleServerNumber > 0) {
return requiredShuffleServerNumber;
}
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
int taskConcurrencyPerServer = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER);
return (int) Math.ceil(estimateTaskConcurrency * 1.0 / taskConcurrencyPerServer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public void testEstimateTaskConcurrency() {
sparkConf.set(Constants.SPARK_DYNAMIC_ENABLED, "true");
sparkConf.set(Constants.SPARK_MAX_DYNAMIC_EXECUTOR, "200");
sparkConf.set(Constants.SPARK_MIN_DYNAMIC_EXECUTOR, "100");
sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED, true);
sparkConf.set(Constants.SPARK_EXECUTOR_CORES, "2");
int taskConcurrency;

Expand All @@ -188,4 +187,27 @@ public void testEstimateTaskConcurrency() {
taskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
assertEquals(70, taskConcurrency);
}

@Test
public void testGetRequiredShuffleServerNumber() {
SparkConf sparkConf = new SparkConf();
sparkConf.set(Constants.SPARK_DYNAMIC_ENABLED, "true");
sparkConf.set(Constants.SPARK_MAX_DYNAMIC_EXECUTOR, "200");
sparkConf.set(Constants.SPARK_MIN_DYNAMIC_EXECUTOR, "100");
sparkConf.set(Constants.SPARK_EXECUTOR_CORES, "4");

assertEquals(-1, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));

sparkConf.set(RssSparkConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true);
assertEquals(10, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));

sparkConf.set(Constants.SPARK_TASK_CPUS, "2");
assertEquals(5, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));

sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR, 0.5);
assertEquals(4, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));

sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER, 100);
assertEquals(3, RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff
// get all register info according to coordinator's response
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);

int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,12 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<

Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);

int requiredShuffleServerNumber = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
boolean enabledEstimateTaskConcurrency = sparkConf.get(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED);
int estimateTaskConcurrency = enabledEstimateTaskConcurrency
? RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf) : -1;
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Map<Integer, List<ShuffleServerInfo>> partitionToServers;
try {
partitionToServers = RetryUtils.retry(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class RssClientConfig {
"rss.estimate.task.concurrency.dynamic.factor";
public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE = 1.0;

public static final String RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED = "rss.estimate.task.concurrency.enabled";
public static final boolean RSS_ESTIMATE_TASK_CONCURRENCY_DEFAULT_VALUE = false;
public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = "rss.estimate.server.assignment.enabled";
public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE = false;

public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = "rss.estimate.task.concurrency.per.server";
public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = 80;

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@ public class Constants {
public static final String MR_REDUCES = "mapreduce.job.reduces";
public static final String MR_MAP_LIMIT = "mapreduce.job.running.map.limit";
public static final String MR_REDUCE_LIMIT = "mapreduce.job.running.reduce.limit";
public static int MR_MAP_LIMIT_DEFAULT_VALUE = 0;
public static int MR_REDUCE_LIMIT_DEFAULT_VALUE = 0;
public static final String MR_SLOW_START = "mapreduce.job.reduce.slowstart.completedmaps";
public static double MR_SLOW_START_DEFAULT_VALUE = 0.05;
}
8 changes: 3 additions & 5 deletions docs/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ and Continuous partition assignment mechanism.
```bash
# Default value is ROUND, it will poll to allocate partitions to ShuffleServer
rss.coordinator.select.partition.strategy CONTINUOUS

# Default value is false, the CONTINUOUS allocation mechanism relies on enabling this configuration, and estimates how many consecutive allocations should be allocated based on task concurrency
--conf spark.rss.estimate.task.concurrency.enabled=true

# Default value is 1.0, used to estimate task concurrency, how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated
--conf spark.rss.estimate.task.concurrency.dynamic.factor=1.0
Expand Down Expand Up @@ -118,8 +115,9 @@ These configurations are shared by all types of clients.
|<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.|
|<client_type>.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3|
|<client_type>.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x|
|<client_type>.rss.estimate.task.concurrency.enabled|false|Only works in spark3, whether to enable task concurrency estimation, only valid if rss.coordinator.select.partition.strategy is CONTINUOUS, this feature can improve performance in AQE scenarios.|
|<client_type>.rss.estimate.task.concurrency.dynamic.factor|1.0|Between 0 and 1, used to estimate task concurrency, how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated. Only works in spark3, <client_type>.rss.estimate.task.concurrency.enabled=true, and Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS.|
|<client_type>.rss.estimate.task.concurrency.dynamic.factor|1.0|Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when <client_type>.rss.estimate.server.assignment.enabled=true or Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS.|
|<client_type>.rss.estimate.server.assignment.enabled|false|Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks.|
|<client_type>.rss.estimate.task.concurrency.per.server|80|It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer.|
Notice:

1. `<client_type>` should be `spark` or `mapreduce`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public void updateSparkConfWithRss(SparkConf sparkConf) {
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA.key(), String.valueOf(replicateWrite));
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_WRITE.key(), String.valueOf(replicateWrite));
sparkConf.set(RssSparkConfig.RSS_DATA_REPLICA_READ.key(), String.valueOf(replicateRead));
sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_ENABLED, true);
sparkConf.set("spark.shuffle.manager",
"org.apache.uniffle.test.GetShuffleReportForMultiPartTest$RssShuffleManagerWrapper");
}
Expand Down