Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,13 @@ object CometConf extends ShimCometConf {
.createWithDefault(false)

val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
.doc("The type of memory pool to be used for Comet native execution. " +
"When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " +
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
"and `unbounded`. When running Spark in off-heap mode, available pool types are " +
"'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap " +
s"mode and `unified` for off-heap mode. $TUNING_GUIDE.")
.doc(
"The type of memory pool to be used for Comet native execution. " +
"When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " +
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
"and `unbounded`. When running Spark in off-heap mode, available pool types are " +
"'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` " +
s"for on-heap mode and `unified` for off-heap mode. $TUNING_GUIDE.")
.stringConf
.createWithDefault("default")

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
Expand Down
6 changes: 3 additions & 3 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ Comet implements multiple memory pool implementations. The type of pool can be s

The valid pool types for off-heap mode are:

- `unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `fair_unified`
- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `greedy_unified`

Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`.

The `unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.

The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory
Expand Down
10 changes: 6 additions & 4 deletions native/core/src/execution/memory_pools/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::errors::{CometError, CometResult};

#[derive(Copy, Clone, PartialEq, Eq)]
pub(crate) enum MemoryPoolType {
Unified,
GreedyUnified,
FairUnified,
Greedy,
FairSpill,
Expand Down Expand Up @@ -62,12 +62,14 @@ pub(crate) fn parse_memory_pool_config(
let pool_size = memory_limit as usize;
let memory_pool_config = if off_heap_mode {
match memory_pool_type.as_str() {
"fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
"default" | "unified" => {
"default" | "fair_unified" => {
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size)
}
"greedy_unified" => {
// the `unified` memory pool interacts with Spark's memory pool to allocate
// memory therefore does not need a size to be explicitly set. The pool size
// shared with Spark is set by `spark.memory.offHeap.size`.
MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
MemoryPoolConfig::new(MemoryPoolType::GreedyUnified, 0)
}
_ => {
return Err(CometError::Config(format!(
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(crate) fn create_memory_pool(
) -> Arc<dyn MemoryPool> {
const NUM_TRACKED_CONSUMERS: usize = 10;
match memory_pool_config.pool_type {
MemoryPoolType::Unified => {
MemoryPoolType::GreedyUnified => {
// Set Comet memory pool for native
let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
Expand Down
Loading