Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ private[nio] class ConnectionManager(
private val ackTimeoutMonitor =
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))

private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
private val ackTimeout =
conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 100))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was default value changed from 60 to 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lewuathe , that is what I wanted to know. That what should I keep as the default value. Keep a single fixed timeout value for the config spark.network.timeout or change the default based on defaults for earlier configurations which this config intends to replace. As I am pretty new to Spark, I am not aware of what default value will be suitable. Maybe @rxin can confirm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 100 is ok - given the akka timeout was 100.


private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

private val akkaTimeout = AkkaUtils.askTimeout(conf)

val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000))
val slaveTimeout = {
val defaultMs = math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @tdas @sryza

why is the timeout capped at 45000 max?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also @varunsaxena you should do the proper line wrapping here. right now the line wrap has no semantic meaning at all. you are just wrapping at a random location ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin , how about this ?

val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
  conf.getInt("spark.network.timeout", 
    math.max(
      conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)/1000) * 1000)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

```scala
val slaveTimeout = {
  val defaultMs = math.max(conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 45000)
  val networkTimeout = conf.getInt("spark.network.timeout", defaultMs / 1000)
  conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", networkTimeout)
}

I'm still not sure why we cap it to 45000 by default ...

val networkTimeout = conf.getInt("spark.network.timeout", defaultMs / 1000)
conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", networkTimeout * 1000)
}

val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] object AkkaUtils extends Logging {

val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 100))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,16 @@ Apart from these, the following properties are also available, and may be useful
Communication timeout between Spark nodes, in seconds.
</td>
</tr>
<tr>
<td><code>spark.network.timeout</code></td>
<td>100</td>
<td>
Default timeout for all network interactions, in seconds. This config will be used in
place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
<code>spark.storage.blockManagerSlaveTimeoutMs</code> or <code>spark.shuffle.io.connectionTimeout</code>,
if they are not configured.
</td>
</tr>
<tr>
<td><code>spark.akka.heartbeat.pauses</code></td>
<td>6000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public boolean preferDirectBufs() {

/** Connect timeout in secs. Default 120 secs. */
public int connectionTimeoutMs() {
return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000;
int timeout =
conf.getInt("spark.shuffle.io.connectionTimeout", conf.getInt("spark.network.timeout", 100));
return timeout * 1000;
}

/** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */
Expand Down