diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index c2a5d05829..249b0ea6bf 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -509,12 +509,13 @@ object CometConf extends ShimCometConf { .createWithDefault(false) val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool") - .doc("The type of memory pool to be used for Comet native execution. " + - "When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " + - "'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " + - "and `unbounded`. When running Spark in off-heap mode, available pool types are " + - "'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap " + - s"mode and `unified` for off-heap mode. $TUNING_GUIDE.") + .doc( + "The type of memory pool to be used for Comet native execution. " + + "When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " + + "'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " + + "and `unbounded`. When running Spark in off-heap mode, available pool types are " + + "'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` " + + s"for on-heap mode and `unified` for off-heap mode. $TUNING_GUIDE.") .stringConf .createWithDefault("default") diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index bebca3c443..4c8fe810e3 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -49,7 +49,7 @@ Comet provides the following configuration settings. | spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true | | spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | | spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | -| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default | +| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | | spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index a35e328570..03aa8793b4 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -116,13 +116,13 @@ Comet implements multiple memory pool implementations. The type of pool can be s The valid pool types for off-heap mode are: -- `unified` (default when `spark.memory.offHeap.enabled=true` is set) -- `fair_unified` +- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) +- `greedy_unified` Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`. -The `unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not +The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index f9a0e23f4f..ebab45b11f 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -19,7 +19,7 @@ use crate::errors::{CometError, CometResult}; #[derive(Copy, Clone, PartialEq, Eq)] pub(crate) enum MemoryPoolType { - Unified, + GreedyUnified, FairUnified, Greedy, FairSpill, @@ -62,12 +62,14 @@ pub(crate) fn parse_memory_pool_config( let pool_size = memory_limit as usize; let memory_pool_config = if off_heap_mode { match memory_pool_type.as_str() { - "fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size), - "default" | "unified" => { + "default" | "fair_unified" => { + MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size) + } + "greedy_unified" => { // the `unified` memory pool interacts with Spark's memory pool to allocate // memory therefore does not need a size to be explicitly set. The pool size // shared with Spark is set by `spark.memory.offHeap.size`. - MemoryPoolConfig::new(MemoryPoolType::Unified, 0) + MemoryPoolConfig::new(MemoryPoolType::GreedyUnified, 0) } _ => { return Err(CometError::Config(format!( diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index 3e40dc6923..fc6a81a5e5 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -40,7 +40,7 @@ pub(crate) fn create_memory_pool( ) -> Arc { const NUM_TRACKED_CONSUMERS: usize = 10; match memory_pool_config.pool_type { - MemoryPoolType::Unified => { + MemoryPoolType::GreedyUnified => { // Set Comet memory pool for native let memory_pool = CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);