Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[spark] object Worker {
.createWithDefault(100)

val WORKER_DECOMMISSION_SIGNAL =
ConfigBuilder("spark.worker.decommission.signal")
ConfigBuilder("spark.decommission.worker.signal")
.doc("The signal that used to trigger the worker to start decommission.")
.version("3.2.0")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,37 +421,37 @@ package object config {
.createWithDefault(1)

private[spark] val STORAGE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.enabled")
ConfigBuilder("spark.decommission.storage.enabled")
.doc("Whether to decommission the block manager when decommissioning executor")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled")
ConfigBuilder("spark.decommission.storage.shuffleBlocks.enabled")
.doc("Whether to transfer shuffle blocks during block manager decommissioning. Requires " +
"a migratable shuffle resolver (like sort based shuffle)")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_THREADS =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads")
ConfigBuilder("spark.decommission.storage.shuffleBlocks.maxThreads")
.doc("Maximum number of threads to use in migrating shuffle files.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The maximum number of threads should be positive")
.createWithDefault(8)

private[spark] val STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED =
ConfigBuilder("spark.storage.decommission.rddBlocks.enabled")
ConfigBuilder("spark.decommission.storage.rddBlocks.enabled")
.doc("Whether to transfer RDD blocks during block manager decommissioning.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
ConfigBuilder("spark.decommission.storage.maxReplicationFailuresPerBlock")
.internal()
.doc("Maximum number of failures which can be handled for the replication of " +
"one RDD block when block manager is decommissioning and trying to move its " +
Expand All @@ -461,7 +461,7 @@ package object config {
.createWithDefault(3)

private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
ConfigBuilder("spark.decommission.storage.replicationReattemptInterval")
.internal()
.doc("The interval of time between consecutive cache block replication reattempts " +
"happening on each decommissioning executor (due to storage decommissioning).")
Expand All @@ -472,7 +472,7 @@ package object config {
.createWithDefaultString("30s")

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
ConfigBuilder("spark.decommission.storage.fallbackStoragePath")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.doc("The location for fallback storage during block manager decommissioning. " +
"For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " +
"The storage should be managed by TTL because Spark will not clean it up.")
Expand Down Expand Up @@ -1917,7 +1917,7 @@ package object config {
.createWithDefault(false)

private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
ConfigBuilder("spark.decommission.executor.killInterval")
.doc("Duration after which a decommissioned executor will be killed forcefully." +
"This config is useful for cloud environments where we know in advance when " +
"an executor is going to go down after decommissioning signal i.e. around 2 mins " +
Expand All @@ -1928,7 +1928,7 @@ package object config {
.createOptional

private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
ConfigBuilder("spark.executor.decommission.signal")
ConfigBuilder("spark.decommission.executor.signal")
.doc("The signal that used to trigger the executor to start decommission.")
.version("3.2.0")
.stringConf
Expand Down