From 9f9f2f60ff774db33c83ce82559a258e07e76a6a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 19 Dec 2020 01:59:22 +0800 Subject: [PATCH 1/3] [SPARK-33842][Core][Shuffle] Add spark.shuffle.io.sessionTimeout to check a established connnection for being idled or dead --- .../spark/network/TransportContext.java | 4 ++-- .../server/TransportChannelHandler.java | 5 ++-- .../spark/network/util/TransportConf.java | 9 +++++++ .../RequestTimeoutIntegrationSuite.java | 3 ++- .../client/TransportClientFactorySuite.java | 2 +- docs/configuration.md | 24 ++++++++++++++++++- 6 files changed, 40 insertions(+), 7 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index a0de9df1986f..ad69d27e5560 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -192,7 +192,7 @@ public TransportChannelHandler initializePipeline( .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", - new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) + new IdleStateHandler(0, 0, conf.sessionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); @@ -228,7 +228,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this); + conf.sessionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this); } public TransportConf getConf() { return conf; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index e53a0c1a0852..b43222ca6c71 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -165,8 +165,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (hasInFlightRequests) { String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + - "requests. Assuming connection is dead; please adjust spark.network.timeout if " + - "this is wrong.", address, requestTimeoutNs / 1000 / 1000); + "requests. Assuming connection is dead; please adjust spark.{}.io.sessionTimeout if " + + "this is wrong.", address, requestTimeoutNs / 1000 / 1000, + transportContext.getConf().getModuleName()); client.timeOut(); ctx.close(); } else if (closeIdleConnections) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index fd287b022618..2563ddab8f64 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -31,6 +31,7 @@ public class TransportConf { private final String SPARK_NETWORK_IO_MODE_KEY; private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY; private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY; + private final String SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY; private final String SPARK_NETWORK_IO_BACKLOG_KEY; private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY; private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY; @@ -54,6 +55,7 @@ public TransportConf(String module, ConfigProvider conf) { SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode"); SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs"); SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout"); + SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY = getConfKey("io.sessionTimeout"); SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog"); SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer"); SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads"); @@ -103,6 +105,13 @@ public int connectionTimeoutMs() { return (int) defaultTimeoutMs; } + /** Session timeout in milliseconds. Default {@link TransportConf#connectionTimeoutMs}. */ + public int sessionTimeoutMs() { + long defaultTimeoutMs = JavaUtils.timeStringAsMs( + conf.get(SPARK_NETWORK_IO_SESSIONTIMEOUT_KEY, connectionTimeoutMs() + "ms")); + return (int) defaultTimeoutMs; + } + /** Number of concurrent connections between two nodes for fetching data. */ public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index 15a28ba249b8..4a680d923b6d 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -61,7 +61,8 @@ public class RequestTimeoutIntegrationSuite { @Before public void setUp() throws Exception { Map configMap = new HashMap<>(); - configMap.put("spark.shuffle.io.connectionTimeout", "10s"); + configMap.put("spark.shuffle.io.sessionTimeout", "10s"); + configMap.put("spark.shuffle.io.connectionTimeout", "600s"); conf = new TransportConf("shuffle", new MapConfigProvider(configMap)); defaultManager = new StreamManager() { diff --git a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java index ea0ac51589dc..d6b1b6fa4cc2 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java @@ -192,7 +192,7 @@ public void closeIdleConnectionForRequestTimeOut() throws IOException, Interrupt @Override public String get(String name) { - if ("spark.shuffle.io.connectionTimeout".equals(name)) { + if ("spark.shuffle.io.sessionTimeout".equals(name)) { // We should make sure there is enough time for us to observe the channel is active return "1s"; } diff --git a/docs/configuration.md b/docs/configuration.md index 21506e690126..756b3bccea3d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -923,6 +923,27 @@ Apart from these, the following properties are also available, and may be useful 1.1.1 + + spark.shuffle.io.connectionTimeout + value of spark.network.timeout + + (Netty only) Timeout for a shuffle clients to establish the connection with shuffle server. If + time exceeds this setting, the current handshake will be aborted and retry within + spark.shuffle.io.maxRetries in shuffle transportation or + spark.shuffle.registration.maxAttempts in registration with external shuffle service. + + 1.2.0 + + + spark.shuffle.io.sessionTimeout + value of spark.shuffle.io.connectionTimeout + + (Netty only) Timeout for the established connections between shuffle servers and clients + to be marked as idled. The connection will be closed for it is quiet or may be dead whether + there are still outstanding request or not. + + 3.2.0 + spark.shuffle.service.enabled false @@ -1920,7 +1941,8 @@ Apart from these, the following properties are also available, and may be useful Default timeout for all network interactions. This config will be used in place of spark.storage.blockManagerHeartbeatTimeoutMs, - spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout or + spark.shuffle.io.connectionTimeout, spark.shuffle.io.sessionTimeout, + spark.rpc.askTimeout or spark.rpc.lookupTimeout if they are not configured. 1.3.0 From e51904fc05c4a97966761b7bb4f976580a1db9ab Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 19 Dec 2020 03:11:13 +0800 Subject: [PATCH 2/3] style --- .../apache/spark/network/server/TransportChannelHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index b43222ca6c71..3be02cb73cf1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -165,8 +165,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (hasInFlightRequests) { String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + - "requests. Assuming connection is dead; please adjust spark.{}.io.sessionTimeout if " + - "this is wrong.", address, requestTimeoutNs / 1000 / 1000, + "requests. Assuming connection is dead; please adjust spark.{}.io.sessionTimeout " + + "if this is wrong.", address, requestTimeoutNs / 1000 / 1000, transportContext.getConf().getModuleName()); client.timeOut(); ctx.close(); From efc292a8d182e7ea960da1084099c271c9362dd1 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 20 Dec 2020 00:43:15 +0800 Subject: [PATCH 3/3] nit --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 756b3bccea3d..8cc911a55840 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -940,7 +940,7 @@ Apart from these, the following properties are also available, and may be useful (Netty only) Timeout for the established connections between shuffle servers and clients to be marked as idled. The connection will be closed for it is quiet or may be dead whether - there are still outstanding request or not. + there are still outstanding requests or not. 3.2.0