diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index a0de9df1986f..ad69d27e5560 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -192,7 +192,7 @@ public TransportChannelHandler initializePipeline( .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", - new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) + new IdleStateHandler(0, 0, conf.sessionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); @@ -228,7 +228,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this); + conf.sessionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this); } public TransportConf getConf() { return conf; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index e53a0c1a0852..3be02cb73cf1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -165,8 +165,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (hasInFlightRequests) { String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + - "requests. Assuming connection is dead; please adjust spark.network.timeout if " + - "this is wrong.", address, requestTimeoutNs / 1000 / 1000); + "requests. Assuming connection is dead; please adjust spark.{}.io.sessionTimeout " + + "if this is wrong.", address, requestTimeoutNs / 1000 / 1000, + transportContext.getConf().getModuleName()); client.timeOut(); ctx.close(); } else if (closeIdleConnections) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index fd287b022618..2563ddab8f64 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -31,6 +31,7 @@ public class TransportConf { private final String SPARK_NETWORK_IO_MODE_KEY; private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY; private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY; + private final String SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY; private final String SPARK_NETWORK_IO_BACKLOG_KEY; private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY; private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY; @@ -54,6 +55,7 @@ public TransportConf(String module, ConfigProvider conf) { SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode"); SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs"); SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout"); + SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY = getConfKey("io.sessionTimeout"); SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog"); SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer"); SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads"); @@ -103,6 +105,13 @@ public int connectionTimeoutMs() { return (int) defaultTimeoutMs; } + /** Session timeout in milliseconds. Default {@link TransportConf#connectionTimeoutMs}. */ + public int sessionTimeoutMs() { + long defaultTimeoutMs = JavaUtils.timeStringAsMs( + conf.get(SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY, connectionTimeoutMs() + "ms")); + return (int) defaultTimeoutMs; + } + /** Number of concurrent connections between two nodes for fetching data. */ public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index 15a28ba249b8..4a680d923b6d 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -61,7 +61,8 @@ public class RequestTimeoutIntegrationSuite { @Before public void setUp() throws Exception { Map configMap = new HashMap<>(); - configMap.put("spark.shuffle.io.connectionTimeout", "10s"); + configMap.put("spark.shuffle.io.sessionTimeout", "10s"); + configMap.put("spark.shuffle.io.connectionTimeout", "600s"); conf = new TransportConf("shuffle", new MapConfigProvider(configMap)); defaultManager = new StreamManager() { diff --git a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java index ea0ac51589dc..d6b1b6fa4cc2 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java @@ -192,7 +192,7 @@ public void closeIdleConnectionForRequestTimeOut() throws IOException, Interrupt @Override public String get(String name) { - if ("spark.shuffle.io.connectionTimeout".equals(name)) { + if ("spark.shuffle.io.sessionTimeout".equals(name)) { // We should make sure there is enough time for us to observe the channel is active return "1s"; } diff --git a/docs/configuration.md b/docs/configuration.md index 21506e690126..8cc911a55840 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -923,6 +923,27 @@ Apart from these, the following properties are also available, and may be useful 1.1.1 + + spark.shuffle.io.connectionTimeout + value of spark.network.timeout + + (Netty only) Timeout for a shuffle clients to establish the connection with shuffle server. If + time exceeds this setting, the current handshake will be aborted and retry within + spark.shuffle.io.maxRetries in shuffle transportation or + spark.shuffle.registration.maxAttempts in registration with external shuffle service. + + 1.2.0 + + + spark.shuffle.io.sessionTimeout + value of spark.shuffle.io.connectionTimeout + + (Netty only) Timeout for the established connections between shuffle servers and clients + to be marked as idled. The connection will be closed for it is quiet or may be dead whether + there are still outstanding requests or not. + + 3.2.0 + spark.shuffle.service.enabled false @@ -1920,7 +1941,8 @@ Apart from these, the following properties are also available, and may be useful Default timeout for all network interactions. This config will be used in place of spark.storage.blockManagerHeartbeatTimeoutMs, - spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout or + spark.shuffle.io.connectionTimeout, spark.shuffle.io.sessionTimeout, + spark.rpc.askTimeout or spark.rpc.lookupTimeout if they are not configured. 1.3.0