diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index df4b085d2251e..008f5fcf6e44f 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -81,7 +81,8 @@ private[nio] class ConnectionManager( private val ackTimeoutMonitor = new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor")) - private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) + private val ackTimeout = + conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 100)) private val handleMessageExecutor = new ThreadPoolExecutor( conf.getInt("spark.core.connection.handler.threads.min", 20), diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 685b2e11440fb..72b133a64eb05 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -52,8 +52,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private val akkaTimeout = AkkaUtils.askTimeout(conf) - val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", - math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)) + val slaveTimeout = { + val defaultMs = math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000) + val networkTimeout = conf.getInt("spark.network.timeout", defaultMs / 1000) + conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", networkTimeout * 1000) + } val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 8c2457f56bffe..64e3a5416c6b5 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -65,7 +65,7 @@ private[spark] object AkkaUtils extends Logging { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeout = conf.getInt("spark.akka.timeout", 100) + val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 100)) val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" diff --git a/docs/configuration.md b/docs/configuration.md index 0b77f5ab645c9..f8822b9441e65 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -776,6 +776,16 @@ Apart from these, the following properties are also available, and may be useful Communication timeout between Spark nodes, in seconds. +
spark.network.timeoutspark.core.connection.ack.wait.timeout, spark.akka.timeout,
+ spark.storage.blockManagerSlaveTimeoutMs or spark.shuffle.io.connectionTimeout,
+ if they are not configured.
+ spark.akka.heartbeat.pauses