Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, Utils}

/**
* Class that manages executor allocated to a StreamingContext, and dynamically request or kill
* Class that manages executors allocated to a StreamingContext, and dynamically requests or kills
* executors based on the statistics of the streaming computation. This is different from the core
* dynamic allocation policy; the core policy relies on executors being idle for a while, but the
* micro-batch model of streaming prevents any particular executors from being idle for a long
Expand All @@ -43,6 +43,10 @@ import org.apache.spark.util.{Clock, Utils}
*
* This features should ideally be used in conjunction with backpressure, as backpressure ensures
* system stability, while executors are being readjusted.
*
* Note that an initial set of executors (spark.executor.instances) was allocated when the
* SparkContext was created. This class scales executors up/down after the StreamingContext
* has started.
*/
private[streaming] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
Expand Down Expand Up @@ -202,12 +206,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"

def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
val numExecutor = conf.getInt("spark.executor.instances", 0)
val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
throw new IllegalArgumentException(
"Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")
}
if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
throw new IllegalArgumentException(
"""
Expand All @@ -217,7 +216,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
""".stripMargin)
}
val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
numExecutor == 0 && streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
}

def createIfEnabled(
Expand Down