Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public TransportChannelHandler initializePipeline(
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler",
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
new IdleStateHandler(0, 0, conf.sessionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
Expand Down Expand Up @@ -228,7 +228,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
conf.sessionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
}

public TransportConf getConf() { return conf; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (hasInFlightRequests) {
String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
"requests. Assuming connection is dead; please adjust spark.{}.io.sessionTimeout " +
"if this is wrong.", address, requestTimeoutNs / 1000 / 1000,
transportContext.getConf().getModuleName());
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class TransportConf {
private final String SPARK_NETWORK_IO_MODE_KEY;
private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
private final String SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY;
private final String SPARK_NETWORK_IO_BACKLOG_KEY;
private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
Expand All @@ -54,6 +55,7 @@ public TransportConf(String module, ConfigProvider conf) {
SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY = getConfKey("io.sessionTimeout");
SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer");
SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
Expand Down Expand Up @@ -103,6 +105,13 @@ public int connectionTimeoutMs() {
return (int) defaultTimeoutMs;
}

/** Session timeout in milliseconds. Default {@link TransportConf#connectionTimeoutMs}. */
public int sessionTimeoutMs() {
long defaultTimeoutMs = JavaUtils.timeStringAsMs(
conf.get(SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY, connectionTimeoutMs() + "ms"));
return (int) defaultTimeoutMs;
}

/** Number of concurrent connections between two nodes for fetching data. */
public int numConnectionsPerPeer() {
return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class RequestTimeoutIntegrationSuite {
@Before
public void setUp() throws Exception {
Map<String, String> configMap = new HashMap<>();
configMap.put("spark.shuffle.io.connectionTimeout", "10s");
configMap.put("spark.shuffle.io.sessionTimeout", "10s");
configMap.put("spark.shuffle.io.connectionTimeout", "600s");
conf = new TransportConf("shuffle", new MapConfigProvider(configMap));

defaultManager = new StreamManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void closeIdleConnectionForRequestTimeOut() throws IOException, Interrupt

@Override
public String get(String name) {
if ("spark.shuffle.io.connectionTimeout".equals(name)) {
if ("spark.shuffle.io.sessionTimeout".equals(name)) {
// We should make sure there is enough time for us to observe the channel is active
return "1s";
}
Expand Down
24 changes: 23 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,27 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>1.1.1</td>
</tr>
<tr>
<td><code>spark.shuffle.io.connectionTimeout</code></td>
<td>value of <code>spark.network.timeout</code></td>
<td>
(Netty only) Timeout for a shuffle clients to establish the connection with shuffle server. If
time exceeds this setting, the current handshake will be aborted and retry within
<code>spark.shuffle.io.maxRetries</code> in shuffle transportation or
<code>spark.shuffle.registration.maxAttempts</code> in registration with external shuffle service.
</td>
<td>1.2.0</td>
</tr>
<tr>
<td><code>spark.shuffle.io.sessionTimeout</code></td>
<td>value of <code>spark.shuffle.io.connectionTimeout</code></td>
<td>
(Netty only) Timeout for the established connections between shuffle servers and clients
to be marked as idled. The connection will be closed for it is quiet or may be dead whether
there are still outstanding requests or not.
</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>spark.shuffle.service.enabled</code></td>
<td>false</td>
Expand Down Expand Up @@ -1920,7 +1941,8 @@ Apart from these, the following properties are also available, and may be useful
<td>
Default timeout for all network interactions. This config will be used in place of
<code>spark.storage.blockManagerHeartbeatTimeoutMs</code>,
<code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or
<code>spark.shuffle.io.connectionTimeout</code>, <code>spark.shuffle.io.sessionTimeout</code>,
<code>spark.rpc.askTimeout</code> or
<code>spark.rpc.lookupTimeout</code> if they are not configured.
</td>
<td>1.3.0</td>
Expand Down