diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java index c30e744c6f..436f803edc 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java @@ -37,7 +37,8 @@ public class RssBaseConf extends RssConf { ConfigOptions.key("rss.rpc.server.type") .enumType(ServerType.class) .defaultValue(ServerType.GRPC) - .withDescription("Shuffle server type, default is grpc"); + .withDescription( + "Shuffle server type, supports GRPC_NETTY, GRPC. The default value is GRPC for now. But we recommend using GRPC_NETTY to enable Netty on the server side for better stability and performance."); public static final ConfigOption RPC_SERVER_PORT = ConfigOptions.key("rss.rpc.server.port") diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index ac432e0778..4ecd103c9c 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -193,7 +193,8 @@ public class RssClientConf { ConfigOptions.key("rss.client.type") .enumType(ClientType.class) .defaultValue(ClientType.GRPC) - .withDescription("Supports GRPC, GRPC_NETTY"); + .withDescription( + "Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the client side for better stability and performance."); public static final ConfigOption RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED = ConfigOptions.key("rss.client.remote.storage.useLocalConfAsDefault") diff --git a/docs/client_guide/client_guide.md b/docs/client_guide/client_guide.md index 0918a992a7..b2c98fcb5b 100644 --- a/docs/client_guide/client_guide.md +++ b/docs/client_guide/client_guide.md @@ -25,7 +25,7 @@ Uniffle is designed as a unified shuffle engine for multiple computing framework Uniffle has provided pluggable client plugins to enable remote shuffle in Spark, MapReduce and Tez. ## Deploy & client specific configuration -Refer to the following documents on how to deploy Uniffle client plugins with Spark, MapReduce and Tez. Client specific configurations are also listed in each documents. +Refer to the following documents on how to deploy Uniffle client plugins with Spark, MapReduce and Tez. Client specific configurations are also listed in each document. |Client|Link| |---|---| |Spark|[Deploy Spark Client Plugin & Configurations](spark_client_guide.md)| @@ -37,25 +37,25 @@ Refer to the following documents on how to deploy Uniffle client plugins with Sp The important configuration of client is listed as following. These configurations are shared by all types of clients. -|Property Name|Default| Description | -|---|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -|.rss.coordinator.quorum|-| Coordinator quorum | -|.rss.writer.buffer.size|3m| Buffer size for single partition data | -|.rss.storage.type|-| Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS | -|.rss.client.read.buffer.size|14m| The max data size read from storage | -|.rss.client.send.threadPool.size|5| The thread size for send shuffle data to shuffle server | -|.rss.client.assignment.tags|-| The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not | -|.rss.client.data.commit.pool.size|The number of assigned shuffle servers| The thread size for sending commit to shuffle servers | -|.rss.client.assignment.shuffle.nodes.max|-1| The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default | -|.rss.client.io.compression.codec|lz4| The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`. | -|.rss.client.io.compression.zstd.level|3| The zstd compression level, the default level is 3 | -|.rss.client.shuffle.data.distribution.type|NORMAL| The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x | -|.rss.estimate.task.concurrency.dynamic.factor|1.0| Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when .rss.estimate.server.assignment.enabled=true or Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS. | -|.rss.estimate.server.assignment.enabled|false| Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks. | -|.rss.estimate.task.concurrency.per.server|80| It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer. | -|.rss.client.max.concurrency.of.per-partition.write|-| The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. | -|.rss.client.rpc.timeout.ms|60000| Timeout in milliseconds for RPC calls. | -|.rss.client.rpc.maxAttempts|3| When we fail to send RPC calls, we will retry for maxAttempts times. | +| Property Name | Default | Description | +|-----------------------------------------------------------------|----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| .rss.coordinator.quorum | - | Coordinator quorum | +| .rss.writer.buffer.size | 3m | Buffer size for single partition data | +| .rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS | +| .rss.client.read.buffer.size | 14m | The max data size read from storage | +| .rss.client.send.threadPool.size | 5 | The thread size for send shuffle data to shuffle server | +| .rss.client.assignment.tags | - | The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not | +| .rss.client.data.commit.pool.size | The number of assigned shuffle servers | The thread size for sending commit to shuffle servers | +| .rss.client.assignment.shuffle.nodes.max | -1 | The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default | +| .rss.client.io.compression.codec | lz4 | The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`. | +| .rss.client.io.compression.zstd.level | 3 | The zstd compression level, the default level is 3 | +| .rss.client.shuffle.data.distribution.type | NORMAL | The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x | +| .rss.estimate.task.concurrency.dynamic.factor | 1.0 | Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when .rss.estimate.server.assignment.enabled=true or Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS. | +| .rss.estimate.server.assignment.enabled | false | Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks. | +| .rss.estimate.task.concurrency.per.server | 80 | It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer. | +| .rss.client.max.concurrency.of.per-partition.write | - | The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. | +| .rss.client.rpc.timeout.ms | 60000 | Timeout in milliseconds for RPC calls. | +| .rss.client.rpc.maxAttempts | 3 | When we fail to send RPC calls, we will retry for maxAttempts times. | Notice: 1. `` should be `mapreduce` `tez` or `spark` @@ -69,11 +69,11 @@ Uniffle supports client-side quorum protocol to tolerant shuffle server crash. This feature is client-side behaviour, in which shuffle writer sends each block to multiple servers, and shuffle readers could fetch block data from one of server. Since sending multiple replicas of blocks can reduce the shuffle performance and resource consumption, we designed it as an optional feature. -|Property Name|Default|Description| -|---|---|---| -|.rss.data.replica|1|The max server number that each block can be send by client in quorum protocol| -|.rss.data.replica.write|1|The min server number that each block should be send by client successfully| -|.rss.data.replica.read|1|The min server number that metadata should be fetched by client successfully | +| Property Name | Default | Description | +|--------------------------------------|---------|--------------------------------------------------------------------------------| +| .rss.data.replica | 1 | The max server number that each block can be send by client in quorum protocol | +| .rss.data.replica.write | 1 | The min server number that each block should be send by client successfully | +| .rss.data.replica.read | 1 | The min server number that metadata should be fetched by client successfully | Notice: @@ -96,9 +96,9 @@ spark.rss.data.replica.read 2 ``` ### Netty Setting -| Property Name | Default | Description | -|-----------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| .rss.client.type | GRPC | The default is GRPC, we can set it to GRPC_NETTY to enable the Netty on the client | +| Property Name | Default | Description | +|-------------------------------------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| .rss.client.type | GRPC | Supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the client side for better stability and performance. | | .rss.client.netty.io.mode | NIO | Netty EventLoopGroup backend, available options: NIO, EPOLL. | | .rss.client.netty.client.connection.timeout.ms | 600000 | Connection active timeout. | | .rss.client.netty.client.threads | 0 | Number of threads used in the client thread pool. Default is 0, Netty will use the number of (available logical cores * 2) as the number of threads. | diff --git a/docs/server_guide.md b/docs/server_guide.md index efe0856a9f..8799fc4a25 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -72,10 +72,11 @@ This document will introduce how to deploy Uniffle shuffle servers. | Property Name | Default | Description | |---------------------------------------------------------|------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | rss.coordinator.quorum | - | Coordinator quorum | -| rss.rpc.server.port | - | RPC port for Shuffle server, if set zero, grpc server start on random port. | -| rss.jetty.http.port | - | Http port for Shuffle server | +| rss.rpc.server.type | GRPC | Shuffle server type, supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the server side for better stability and performance. | +| rss.rpc.server.port | 19999 | RPC port for Shuffle server, if set zero, grpc server start on random port. | +| rss.jetty.http.port | 19998 | Http port for Shuffle server | | rss.server.netty.port | -1 | Netty port for Shuffle server, if set zero, Netty server start on random port. | -| rss.server.netty.epoll.enable | false | If enable epoll model with Netty server. | +| rss.server.netty.epoll.enable | false | Whether to enable epoll model with Netty server. | | rss.server.netty.accept.thread | 10 | Accept thread count in netty. | | rss.server.netty.worker.thread | 0 | Worker thread count in netty. When set to 0, the default value is dynamically set to twice the number of processor cores, but it will not be less than 100 to ensure the minimum throughput of the service. | | rss.server.netty.connect.backlog | 0 | For Netty server, requested maximum length of the queue of incoming connections. | @@ -83,11 +84,11 @@ This document will introduce how to deploy Uniffle shuffle servers. | rss.server.netty.receive.buf | 0 | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings. | | rss.server.netty.send.buf | 0 | Send buffer size (SO_SNDBUF). | | rss.server.buffer.capacity | -1 | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used | -| rss.server.buffer.capacity.ratio | 0.8 | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio | +| rss.server.buffer.capacity.ratio | 0.6 | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio | | rss.server.memory.shuffle.highWaterMark.percentage | 75.0 | Threshold of spill data to storage, percentage of rss.server.buffer.capacity | | rss.server.memory.shuffle.lowWaterMark.percentage | 25.0 | Threshold of keep data in memory, percentage of rss.server.buffer.capacity | | rss.server.read.buffer.capacity | -1 | Max size of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is used | -| rss.server.read.buffer.capacity.ratio | 0.4 | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio | +| rss.server.read.buffer.capacity.ratio | 0.2 | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio | | rss.server.heartbeat.interval | 10000 | Heartbeat interval to Coordinator (ms) | | rss.server.flush.localfile.threadPool.size | 10 | Thread pool for flush data to local file | | rss.server.flush.hadoop.threadPool.size | 60 | Thread pool for flush data to hadoop storage | @@ -108,13 +109,13 @@ This document will introduce how to deploy Uniffle shuffle servers. | rss.server.disk-capacity.watermark.check.enabled | false | If it is co-located with other services, the high-low watermark check based on the uniffle used is not correct. Due to this, the whole disk capacity watermark check is necessary, which will reuse the current watermark value. It will be disabled by default. | ### Advanced Configurations -|Property Name|Default| Description | -|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -|rss.server.storageMediaProvider.from.env.key|-| Sometimes, the local storage type/media info is provided by external system. RSS would read the env key defined by this configuration and get info about the storage media of its basePaths | -|rss.server.decommission.check.interval|60000| The interval(ms) to check if all applications have finish when server is decommissioning | -|rss.server.decommission.shutdown|true| Whether shutdown the server after server is decommissioned | -|rss.server.health.checker.script.path| - | The health script path for `HealthScriptChecker`. To enable `HealthScriptChecker`, need to set `rss.server.health.checker.class.names` and set `rss.server.health.check.enable` to true.| -|rss.server.health.checker.script.execute.timeout| 5000 | Timeout for `HealthScriptChecker` execute health script.(ms)| +| Property Name | Default | Description | +|--------------------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| rss.server.storageMediaProvider.from.env.key | - | Sometimes, the local storage type/media info is provided by external system. RSS would read the env key defined by this configuration and get info about the storage media of its basePaths | +| rss.server.decommission.check.interval | 60000 | The interval(ms) to check if all applications have finish when server is decommissioning | +| rss.server.decommission.shutdown | true | Whether shutdown the server after server is decommissioned | +| rss.server.health.checker.script.path | - | The health script path for `HealthScriptChecker`. To enable `HealthScriptChecker`, need to set `rss.server.health.checker.class.names` and set `rss.server.health.check.enable` to true. | +| rss.server.health.checker.script.execute.timeout | 5000 | Timeout for `HealthScriptChecker` execute health script.(ms) | ### Huge Partition Optimization A huge partition is a common problem for Spark/MR and so on, caused by data skew. And it can cause the shuffle server to become unstable. To solve this, we introduce some mechanisms to limit the writing of huge partitions to avoid affecting regular partitions, more details can be found in [ISSUE-378](https://github.com/apache/incubator-uniffle/issues/378). The basic rules for limiting large partitions are memory usage limits and flushing individual buffers directly to persistent storage. @@ -122,10 +123,10 @@ A huge partition is a common problem for Spark/MR and so on, caused by data skew #### Memory usage limit To do this, we introduce the extra configs -|Property Name|Default| Description | -|---|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -|rss.server.huge-partition.size.threshold|20g| Threshold of huge partition size, once exceeding threshold, memory usage limitation and huge partition buffer flushing will be triggered. This value depends on the capacity of per disk in shuffle server. For example, per disk capacity is 1TB, and the max size of huge partition in per disk is 5. So the total size of huge partition in local disk is 100g (10%),this is an acceptable config value. Once reaching this threshold, it will be better to flush data to HADOOP FS directly, which could be handled by multiple storage manager fallback strategy | -|rss.server.huge-partition.memory.limit.ratio|0.2| The memory usage limit ratio for huge partition, it will only triggered when partition's size exceeds the threshold of 'rss.server.huge-partition.size.threshold'. If the buffer capacity is 10g, this means the default memory usage for huge partition is 2g. Samely, this config value depends on max size of huge partitions on per shuffle server. | +| Property Name | Default | Description | +|----------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| rss.server.huge-partition.size.threshold | 20g | Threshold of huge partition size, once exceeding threshold, memory usage limitation and huge partition buffer flushing will be triggered. This value depends on the capacity of per disk in shuffle server. For example, per disk capacity is 1TB, and the max size of huge partition in per disk is 5. So the total size of huge partition in local disk is 100g (10%),this is an acceptable config value. Once reaching this threshold, it will be better to flush data to HADOOP FS directly, which could be handled by multiple storage manager fallback strategy | +| rss.server.huge-partition.memory.limit.ratio | 0.2 | The memory usage limit ratio for huge partition, it will only triggered when partition's size exceeds the threshold of 'rss.server.huge-partition.size.threshold'. If the buffer capacity is 10g, this means the default memory usage for huge partition is 2g. Similarly, this config value depends on max size of huge partitions on per shuffle server. | #### Data flush Once the huge partition threshold is reached, the partition is marked as a huge partition. And then single buffer flush is triggered (writing to persistent storage as soon as possible). By default, single buffer flush is only enabled by configuring `rss.server.single.buffer.flush.enabled`, but it's automatically valid for huge partition. @@ -139,30 +140,32 @@ Finally, to improve the speed of writing to HDFS for a single partition, the val ### Netty In version 0.8.0, we introduced Netty. Enabling Netty on ShuffleServer can significantly reduce GC time in high-throughput scenarios. We can enable Netty through the parameters `rss.server.netty.port` and `rss.rpc.server.type`. Note: After setting the parameter `rss.rpc.server.type` to `GRPC_NETTY`, ShuffleServer will be tagged with `GRPC_NETTY`, that is, the node can only be assigned to clients with `spark.rss.client.type=GRPC_NETTY`. -When enabling Netty, we should also consider memory related configuration, the following is an example. +When enabling Netty, we should also consider memory related configurations, the following is an example. #### rss-env.sh ``` -XMX_SIZE=80g -MAX_DIRECT_MEMORY_SIZE=60g +XMX_SIZE=20g +MAX_DIRECT_MEMORY_SIZE=120g ``` #### server.conf ``` -rss.server.buffer.capacity 40g -rss.server.read.buffer.capacity 20g +rss.server.buffer.capacity 110g +rss.server.read.buffer.capacity 5g ``` #### Example of server conf ``` rss.rpc.server.port 19999 rss.jetty.http.port 19998 +rss.rpc.server.type GRPC_NETTY +rss.server.netty.port 17000 rss.rpc.executor.size 2000 rss.storage.type MEMORY_LOCALFILE_HDFS rss.coordinator.quorum :19999,:19999 rss.storage.basePath /data1/rssdata,/data2/rssdata.... rss.server.flush.thread.alive 10 -rss.server.buffer.capacity 40g -rss.server.read.buffer.capacity 20g +rss.server.buffer.capacity 110g +rss.server.read.buffer.capacity 5g rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000